# Python for DE - Extract and Load

## Table of Contents
##### 1. Types of Systems Python can Connect to for Read and Write
##### 2. Exercise Questions

## Types of Systems Python can Connect to for Read and Write
1. **Database Drivers**
    - You can connect to a database with the required credentials using certain Python libraries
    - Examples include: sqlite3, psycopg2, duckdb

2. **Cloud SDKs**
    - Most of the cloud providers provide their own SDK (software development kit) to work with the cloud services
    - Examples include `boto3` for AWS and `gsutil` for GCP
    - The SDKs can be used to either extract or load data from a cloud storage like S3 bucket, GCP Cloud Store

3. **APIs**
    - Some systems expose their data via APIs
    - You can make a HTTPS request using Python's `request` library to work with them

4. **Files**
    - Python can read multiple types of files JSON, XML, Parquet, CSV etc

5. **SFTP/FTP**
    - FTP: File Transfer Protocol
    - SFTP: SSH File Transfer Protocol aka Secure FTP
    - Both of them are used to work with files on a server
    - You can browse, upload or download files from or to your local computer via the FTP
    - The key difference is that FTP uses unencrypted channels

6. **Queing System**
    - Systems that queue data and Python has libs to write these to say Kafka

In [6]:
import pandas as pd

url = "https://github.com/josephmachado/python_essentials_for_data_engineers/raw/refs/heads/main/data/customers.csv"
df_customers = pd.read_csv(url)
df_customers.head()

Unnamed: 0,customer_id,zipcode,city,state_code,datetime_created,datetime_updated
0,1,14409,franca,SP,2017-10-18 00:00:00,2017-10-18 00:00:00
1,2,9790,sao bernardo do campo,SP,2017-10-18 00:00:00,2017-10-18 00:00:00
2,3,1151,sao paulo,SP,2017-10-18 00:00:00,2017-10-18 00:00:00
3,4,8775,mogi das cruzes,SP,2017-10-18 00:00:00,2017-10-18 00:00:00
4,5,13056,campinas,SP,2017-10-18 00:00:00,2017-10-18 00:00:00


### Exercise Questions

In [12]:
# Extract: Process to pull data from Source system
# Load: Process to write data to a destination system

# Common upstream & downstream systems
# OLTP Databases: Postgres, MySQL, sqlite3, etc
# OLAP Databases: Snowflake, BigQuery, Clickhouse, DuckDB, etc
# Cloud data storage: AWS S3, GCP Cloud Store, Minio, etc
# Queue systems: Kafka, Redpanda, etc
# API
# Local disk: csv, excel, json, xml files
# SFTP\FTP server

# Databases: When reading or writing to a database we use a database driver. Database drivers are libraries that we can use to read or write to a database.
# Question: How do you read data from a sqlite3 database and write to a DuckDB database?
# Hint: Look at importing the database libraries for sqlite3 and duckdb and create connections to talk to the respective databases

# Fetch data from the SQLite Customer table

# Insert data into the DuckDB Customer table

# Hint: Look for Commit and close the connections
# Commit tells the DB connection to send the data to the database and commit it, if you don't commit the data will not be inserted

# We should close the connection, as DB connections are expensive

import sqlite3
import duckdb

# Connect to a SQLite DB (Creates one if does not exist)
conn = sqlite3.connect("customers.db")
cursor = conn.cursor()
df_customers.to_sql("customers", conn, if_exists="replace", index=False)
conn.commit()

# Fetch data from the SQLite Customer table
# cursor.execute("SELECT * FROM customers")
# rows = cursor.fetchall()

# We could use the INSERT statements but it is not recommended
# Duckdb has native Pandas capabilities therefore we will use that
# Insert data into the DuckDB Customer table

# 1. read from Sqlite DB as pandas df
df_customers_sql = pd.read_sql_query("SELECT * FROM customers", conn)

# 2. Create or open the target DB
duckdb_conn = duckdb.connect("customers.duckdb")

# 3. Insert into DuckDB table
duckdb_conn.register("df_customers_sql", df_customers_sql)  # Register the DataFrame
duckdb_conn.execute("CREATE TABLE IF NOT EXISTS customers AS SELECT * FROM df_customers_sql")

duckdb_conn.commit()

# Close connections
conn.close()
duckdb_conn.close()

In [27]:
# Cloud storage
# Question: How do you read data from the S3 location given below and write the data to a DuckDB database?
# Data source: https://docs.opendata.aws/noaa-ghcn-pds/readme.html station data at path "csv.gz/by_station/ASN00002022.csv.gz"
# Hint: Use boto3 client with UNSIGNED config to access the S3 bucket
# Hint: The data will be zipped you have to unzip it and decode it to utf-8

# AWS S3 bucket and file details
bucket_name = "noaa-ghcn-pds"
file_key = "csv.gz/by_station/ASN00002022.csv.gz"
# Create a boto3 client with anonymous access

# Download the CSV file from S3
# Decompress the gzip data
# Read the CSV file using csv.reader
# Connect to the DuckDB database (assume WeatherData table exists)

# Insert data into the DuckDB WeatherData table


import boto3
from botocore import UNSIGNED
from botocore.config import Config
s3_client = boto3.client('s3', config = Config(signature_version=UNSIGNED))
# 1. Download the file
s3_client.download_file(bucket_name, file_key, "./ASN00002022.csv.gz")

# 2. Decompress the gzip data
import gzip
import csv

with gzip.open("ASN00002022.csv.gz", "rt", encoding="utf-8") as f:
    # 3. Read the CSV file using csv.reader
    csv_reader = csv.reader(f)

    # rest of the remaning rows 
    data = [row for row in csv_reader]

   # csv_reader is a generator function therefore it resumes processing from the last point


# 4. Connect to the DuckDB database (assume WeatherData table exists)
duckdb_conn = duckdb.connect("WeatherData.duckdb")
# Create table if not exists
duckdb_conn.execute("""
    CREATE TABLE IF NOT EXISTS WeatherData (
        ID STRING,
        DATE STRING,
        ELEMENT STRING,
        VALUE INTEGER,
        M_FLAG STRING,
        Q_FLAG STRING,
        S_FLAG STRING,
        OBS_TIME STRING
    )
""")

# Insert data into DuckDB
duckdb_conn.executemany("INSERT INTO WeatherData VALUES (?, ?, ?, ?, ?, ?, ?, ?)", data)


#duckdb_conn.execute("FROM WeatherData").fetchall()
duckdb_conn.commit() # not necessary

duckdb_conn.close()

In [22]:
# API
# Question: How do you read data from the CoinCap API given below and write the data to a DuckDB database?
# URL: "https://api.coincap.io/v2/exchanges"
# Hint: use requests library



# Fetch data from the CoinCap API
# Connect to the DuckDB database

# Insert data into the DuckDB Exchanges table
# Prepare data for insertion
# Hint: Ensure that the data types of the data to be inserted is compatible with DuckDBs data column types in ./setup_db.py

import requests
import pandas as pd
import duckdb

# Define the API endpoint
url = "https://api.coincap.io/v2/exchanges"
try:
    response = requests.get(url)
    data = response.json()
    df_response = pd.DataFrame(data['data'])
except requests.exceptions.RequestException as e:
    print(f"Error: {e}")


conn = duckdb.connect("exchanges.db")
conn.execute("CREATE TABLE IF NOT EXISTS exchanges AS SELECT * FROM df_response;")
#conn.execute("SELECT * FROM exchanges LIMIT 5;").fetchall()
conn.close()

In [23]:
# Local disk
# Question: How do you read a CSV file from local disk and write it to a database?
# Look up open function with csvreader for python
import csv
import sqlite3


# Connect to the database (creates file if not exists)
conn = sqlite3.connect("local_files.db")
cursor = conn.cursor()

# File name
file_name = "m13_arrivals_sample.csv"

# Open CSV file and read headers
with open(file_name, "rt") as f:
    csv_reader = csv.reader(f)
    headers = next(csv_reader)  # Read the first row as headers
    
    # Create table dynamically based on CSV headers
    columns = ", ".join([f"{col} TEXT" for col in headers])  # All columns as TEXT
    cursor.execute(f"CREATE TABLE IF NOT EXISTS some_csv ({columns});")
    
    # Insert data into the table
    placeholders = ", ".join(["?"] * len(headers))  # Create placeholders (?, ?, ?)
    insert_query = f"INSERT INTO some_csv VALUES ({placeholders})"
    
    cursor.executemany(insert_query, csv_reader)  # Insert all rows

# Commit and close the connection
conn.commit()
conn.close()

print("CSV data successfully written to the database.")

    

CSV data successfully written to the database.


In [25]:
# Web scraping
# Questions: Use beatiful soup to scrape the below website and print all the links in that website
# URL of the website to scrape
url = 'https://example.com'
from bs4 import BeautifulSoup

# Send a GET request to the website
response = requests.get(url)

# Parse the HTML content
soup = BeautifulSoup(response.text, "html.parser")
# Find all anchor tags and extract href attribute
links = [a.get("href") for a in soup.find_all("a") if a.get("href")]

# Print all links
for link in links:
    print(link)

https://www.iana.org/domains/example


In [27]:
url = "https://api.open-meteo.com/v1/forecast"

params = {
    "latitude": 52.52,  # Berlin
    "longitude": 13.41,
    "hourly": "temperature_2m"
}
requests.get(url, params).json()

{'latitude': 52.52,
 'longitude': 13.419998,
 'generationtime_ms': 0.045418739318847656,
 'utc_offset_seconds': 0,
 'timezone': 'GMT',
 'timezone_abbreviation': 'GMT',
 'elevation': 38.0,
 'hourly_units': {'time': 'iso8601', 'temperature_2m': 'Â°C'},
 'hourly': {'time': ['2025-01-29T00:00',
   '2025-01-29T01:00',
   '2025-01-29T02:00',
   '2025-01-29T03:00',
   '2025-01-29T04:00',
   '2025-01-29T05:00',
   '2025-01-29T06:00',
   '2025-01-29T07:00',
   '2025-01-29T08:00',
   '2025-01-29T09:00',
   '2025-01-29T10:00',
   '2025-01-29T11:00',
   '2025-01-29T12:00',
   '2025-01-29T13:00',
   '2025-01-29T14:00',
   '2025-01-29T15:00',
   '2025-01-29T16:00',
   '2025-01-29T17:00',
   '2025-01-29T18:00',
   '2025-01-29T19:00',
   '2025-01-29T20:00',
   '2025-01-29T21:00',
   '2025-01-29T22:00',
   '2025-01-29T23:00',
   '2025-01-30T00:00',
   '2025-01-30T01:00',
   '2025-01-30T02:00',
   '2025-01-30T03:00',
   '2025-01-30T04:00',
   '2025-01-30T05:00',
   '2025-01-30T06:00',
   '2025-01-30T07:

In [29]:
# Base URL
url = "https://api.github.com/repos/python/cpython/issues"

# Pagination settings
per_page = 5  # Number of results per page
page = 1  # Start with page 1

while True:
    # Define request parameters
    params = {"per_page": per_page, "page": page}

    # Make API request
    response = requests.get(url, params=params)

    # Check if request was successful
    if response.status_code == 200:
        data = response.json()

        # Break loop if no more data
        if not data:
            print("No more data to fetch.")
            break

        elif page > 3: # just 3 pages
            break

        # Print fetched issue titles
        print(f"Page {page} Results:")
        for issue in data:
            print(f"- {issue['title']} (#{issue['number']})")

        # Move to the next page
        page += 1
    else:
        print(f"Error: {response.status_code}")
        break


Page 1 Results:
- readline error when saving interactive command history when history file path is a symlink to a relative path (#129453)
- _Py_TryIncrefCompareStackRef incorrectly listed as non-escaping in cases generator (#129452)
- gh-129438: Update ``--enable-experimental-jit`` section with install requirements (#129450)
- gh-111495: Add PyFile tests (#129449)
- gh-129250: allow pickle instances of generic classes (#129446)
Page 2 Results:
- Nested virtual environment support in site/venv modules (#129445)
- [3.12] gh-129345: null check for indent syslogmodule (GH-129348) (#129443)
- [3.13] gh-129345: null check for indent syslogmodule (GH-129348) (#129442)
- test_instrumentation failing randomly on free-threading CI (#129441)
- [3.10] gh-119461: Fix ThreadedVSOCKSocketStreamTest (GH-129171) (#129440)
Page 3 Results:
- Python 3.13.1-jit : issue when compiling from source (#129438)
- gh-129354: Use PyErr_FormatUnraisable() function (#129435)
- gh-101944: Clarify PyModule_AddObjectRe