# ETL with PySpark and Airflow Demo Walkthrough

This notebook is meant to be a companion to the demo video. You can watch the video and follow along yourself, go through this notebook without watching the video, or watch and use the notebook.

## Installation and Setup

### Confirm location of java home and spark home

In [1]:
# Find JAVA_HOME
import os
print(os.environ['JAVA_HOME'])

# Find SPARK_HOME
print(os.environ['SPARK_HOME'])

# Confirm path to spark-submit
print(os.environ['SPARK_HOME'] + '/bin/spark-submit')

/opt/homebrew/opt/openjdk@22
/opt/homebrew/Cellar/apache-spark/3.5.1/libexec
/opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/spark-submit


### Initialize virtual environment and install dependencies

This should already be done before you start the notebook, but if you haven't, make and activate a virtual environment now.

In [2]:
# Install dependencies
%pip install pyspark apache-airflow dag-factory pytest

Collecting pyspark
  Using cached pyspark-3.5.1-py2.py3-none-any.whl
Collecting apache-airflow
  Using cached apache_airflow-2.9.3-py3-none-any.whl.metadata (43 kB)
Collecting dag-factory
  Using cached dag_factory-0.19.0-py2.py3-none-any.whl.metadata (4.9 kB)
Collecting pytest
  Using cached pytest-8.3.2-py3-none-any.whl.metadata (7.5 kB)
Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Using cached alembic-1.13.2-py3-none-any.whl.metadata (7.4 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Using cached argcomplete-3.5.0-py3-none-any.whl.metadata (16 kB)
Collecting asgiref (from apache-airflow)
  Using cached asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting attrs>=22.1.0 (from apache-airflow)
  Using cached attrs-24.2.0-py3-none-any.whl.metadata (11 kB)
Collecting blinker>=1.6.2 (from apache-airflow)
  Using cached blinker-1.8.2-py3-none-any.whl.meta

### Verify PySpark

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL Demo").getOrCreate()
df = spark.createDataFrame([(1, 'John'), (2, 'Doe')], ['id', 'name'])
df.show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/09 12:55:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+----+
| id|name|
+---+----+
|  1|John|
|  2| Doe|
+---+----+



### Set Up Input Data

In [5]:
"""
Generate a CSV file of 50 lines with the following columns: 
id, name, age, city, country, position, salary
"""
import random
import csv

first_names = ['John', 'Jane', 'Doe', 'Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace']
last_names = ['Smith', 'Johnson', 'Williams', 'Jones', 'Brown', 'Davis', 'Miller', 'Wilson', 'Moore', 'Taylor']
cities = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix', 'Philadelphia', 'San Antonio', 'San Diego', 'Dallas', 'San Jose']
countries = ['USA', 'Canada', 'Mexico', 'Brazil', 'Argentina', 'Chile', 'Peru', 'Colombia', 'Venezuela', 'Ecuador']
positions = ['Software Engineer', 'Data Scientist', 'Product Manager', 'Sales Manager', 'Marketing Manager', 'HR Manager', 'Accountant', 'Lawyer', 'Doctor', 'Nurse']
salaries = [50000, 60000, 70000, 80000, 90000, 100000, 110000, 120000, 130000, 140000]

# Make the data folder
if not os.path.exists('data'):
    os.makedirs('data')

with open('data/input.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['id', 'name', 'age', 'city', 'country', 'position', 'salary'])
    for i in range(50):
        id = i + 1
        name = random.choice(first_names) + ' ' + random.choice(last_names)
        age = random.randint(20, 60)
        city = random.choice(cities)
        country = random.choice(countries)
        position = random.choice(positions)
        salary = random.choice(salaries)
        writer.writerow([id, name, age, city, country, position, salary])

print('CSV file generated')
    

CSV file generated


### Set Up Pipeline Functions

In [None]:
# Create src directory
if not os.path.exists('src'):
    os.makedirs('src')

The cell below will generate a file with the same code. Run that file to generate output data. Make sure this works before moving on.

In [8]:
%%writefile src/etl.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

def extract_data(spark, file_path):
    # Extract data from a CSV file
    return spark.read.csv(file_path, header=True, inferSchema=True)

def transform_data(df):
    # Perform a simple transformation: filter and select specific columns
    return df.filter(col("age") > 35).select("name", "age", "city")

def load_data(df, output_path):
    # Load transformed data into a new CSV file
    df.write.csv(output_path, header=True, mode="overwrite")

if __name__ == "__main__":
    # Initialize Spark session
    spark = SparkSession.builder.appName("Simple ETL").getOrCreate()

    # Define file paths
    input_file = "data/input.csv"
    output_file = "data/output"

    # Execute ETL process
    data = extract_data(spark, input_file)
    transformed_data = transform_data(data)
    load_data(transformed_data, output_file)

    # Stop Spark session
    spark.stop()

Writing src/etl.py


In [30]:
# Run file
%run src/etl.py

24/08/09 13:16:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## DAG Setup

In [9]:
# Create dags directory
if not os.path.exists('dags'):
    os.makedirs('dags')

In [10]:
%%writefile dags/etl_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import os
import sys

# Add src directory to the system path
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/../src'))

# Import ETL functions
from etl import extract_data, transform_data, load_data

def etl_pipeline():
    print("Setting up pipeline...")
    # Set up the Spark session
    spark = SparkSession.builder.appName("Airflow ETL").getOrCreate()

    # Define file paths
    input_file = "data/input.csv"
    output_file = "data/output"

    # Execute ETL process
    data = extract_data(spark, input_file)
    transformed_data = transform_data(data)
    load_data(transformed_data, output_file)

    # Stop the Spark session
    spark.stop()
    print("Finito!")

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
with DAG(
    'simple_etl_dag_from_script',
    default_args=default_args,
    schedule='@daily',
    catchup=False,
) as dag:

    # Define the ETL task
    etl_task = PythonOperator(
        task_id='run_etl',
        python_callable=etl_pipeline,
    )

    etl_task

Writing dags/etl_dag.py


### Connect the DAG file to Airflow home

In [11]:
# option 1: Copy etl_dag.py to the dags folder (command below)

#!cp dags/etl_dag.py ~/airflow/dags

# option 2: Update airflow configuration to include the this project's dags folder
# within the config file, replace the dags_folder line with the following:
# dags_folder = /path/to/this/project/dags

#!open ~/airflow/airflow.cfg


### Start Airflow database, webserver, and scheduler

In [12]:
!airflow db init

DB: sqlite:////Users/jamison.ducey/airflow/airflow.db
[[34m2024-08-09T13:03:01.391-0600[0m] {[34mmigration.py:[0m215} INFO[0m - Context impl [01mSQLiteImpl[22m.[0m
[[34m2024-08-09T13:03:01.391-0600[0m] {[34mmigration.py:[0m218} INFO[0m - Will assume [01mnon-transactional[22m DDL.[0m
[[34m2024-08-09T13:03:01.447-0600[0m] {[34mmigration.py:[0m215} INFO[0m - Context impl [01mSQLiteImpl[22m.[0m
[[34m2024-08-09T13:03:01.448-0600[0m] {[34mmigration.py:[0m218} INFO[0m - Will assume [01mnon-transactional[22m DDL.[0m
[[34m2024-08-09T13:03:01.448-0600[0m] {[34mdb.py:[0m1625} INFO[0m - Creating tables[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [unusual_prefix_513eeade7e200caf78e1e8d4f2a9fa297ae49150_example_python_operator] The virtalenv_python example task requires virtualenv, please install it.
WARNI [unusual_prefix_50103b34285308bcc84e1c52283dc9ba2be4308e_tutorial

In [16]:
import getpass
import subprocess

# Prompt for the password
password = getpass.getpass("Enter the password for the new Airflow user: ")

# Construct the command
command = [
    "airflow", "users", "create",
    "--username", "admin",
    "--firstname", "John",
    "--lastname", "Doe",
    "--role", "Admin",
    "--email", "admin@example.org",
    "--password", password
]

# Run the command
subprocess.run(command)



admin already exist in the db


CompletedProcess(args=['airflow', 'users', 'create', '--username', 'admin', '--firstname', 'John', '--lastname', 'Doe', '--role', 'Admin', '--email', 'admin@example.org', '--password', 'admin'], returncode=0)

In [14]:
import subprocess

# Start the Airflow webserver
webserver_process = subprocess.Popen(["airflow", "webserver", "--port", "8080"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

print("Airflow webserver started")


Airflow webserver started


In [15]:
# Start the Airflow scheduler
scheduler_process = subprocess.Popen(["airflow", "scheduler"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

print("Airflow scheduler started")

Airflow scheduler started


In [18]:
# open the Airflow UI
!open http://localhost:8080

In [19]:
# Closing the webserver and scheduler processes when you're finished. Run the cells above again to reopen.
webserver_process.terminate()
scheduler_process.terminate()

print("Airflow webserver and scheduler stopped")

Airflow webserver and scheduler stopped


## Set up a YAML for dag-factory

In [20]:
%%writefile dags/etl_cfg.yml

simple_etl_dag_auto:
  default_args:
    owner: 'airflow'
    start_date: '2024-01-01'
    retries: 1
  schedule_interval: '@daily'
  catchup: False
  tasks:
    etl_task:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: 'etl_pipeline'
      python_callable_file: /Users/jamison.ducey/etl_copilot_demo/dags/etl_dag.py

Writing dags/etl_cfg.yml


### Write DAG generator script

In [21]:
%%writefile dags/generate_dags.py

from airflow import DAG  ## by default, this is needed for the dagbag to parse this file
import dagfactory
from pathlib import Path

config_file = Path.cwd() / "dags/etl_cfg.yml"
print(f"config_file: {config_file}")
dag_factory = dagfactory.DagFactory(config_file)

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

Writing dags/generate_dags.py


Test those manually by running the dag-factory script.

In [31]:
# Run the script
!python dags/generate_dags.py

config_file: /Users/jamison.ducey/Desktop/etl_with_copilot/dags/etl_cfg.yml


## Unit Testing

In [22]:
# Create tests directory
if not os.path.exists('tests'):
    os.makedirs('tests')

In [23]:
%%writefile tests/test_etl.py

import pytest
from pyspark.sql import SparkSession

# Add src directory to the system path
import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/../src'))

from etl import extract_data, transform_data, load_data

@pytest.fixture(scope="module")
def spark():
    # Setup: Create a Spark session
    spark = SparkSession.builder.appName("Test ETL").getOrCreate()
    yield spark
    # Teardown: Stop the Spark session
    spark.stop()

def test_extract_data(spark):
    # Create a test CSV file
    test_file = "/tmp/test_input.csv"
    with open(test_file, "w") as f:
        f.write("name,age,city\nJohn Doe,45,New York\nJane Doe,30,Los Angeles\nAlice,40,Chicago\n")
    
    # Test the extract_data function
    df = extract_data(spark, test_file)
    
    # Define the expected output
    expected_data = [("John Doe", 45, "New York"),
                     ("Jane Doe", 30, "Los Angeles"),
                     ("Alice", 40, "Chicago")]
    expected_df = spark.createDataFrame(expected_data, ["name", "age", "city"])
    
    # Check if the result matches the expected output
    assert df.collect() == expected_df.collect()

    # Clean up test file
    os.remove(test_file)

def test_transform_data(spark):
    # Create a DataFrame with test data
    test_data = [("John Doe", 45, "New York"),
                 ("Jane Doe", 30, "Los Angeles"),
                 ("Alice", 40, "Chicago")]
    df = spark.createDataFrame(test_data, ["name", "age", "city"])
    
    # Apply the transform_data function
    result_df = transform_data(df)
    
    # Define the expected output
    expected_data = [("John Doe", 45, "New York"),
                     ("Alice", 40, "Chicago")]
    expected_df = spark.createDataFrame(expected_data, ["name", "age", "city"])
    
    # Check if the result matches the expected output
    assert result_df.collect() == expected_df.collect()

def test_load_data(spark):
    # Create a DataFrame with test data
    test_data = [("John Doe", 45, "New York"),
                 ("Alice", 40, "Chicago")]
    df = spark.createDataFrame(test_data, ["name", "age", "city"])
    
    # Define the output path
    output_path = "/tmp/test_output"
    
    # Apply the load_data function
    load_data(df, output_path)
    
    # Read the output data back
    result_df = spark.read.csv(output_path, header=True, inferSchema=True)
    
    # Define the expected output
    expected_df = df
    
    # Check if the result matches the expected output
    assert result_df.collect() == expected_df.collect()
    
    # Clean up output directory
    os.system(f"rm -r {output_path}")

Writing tests/test_etl.py


Run the tests manually

In [24]:
# Run the tests
!pytest tests/test_etl.py

platform darwin -- Python 3.12.4, pytest-8.3.2, pluggy-1.5.0
rootdir: /Users/jamison.ducey/Desktop/etl_with_copilot
plugins: time-machine-2.15.0, anyio-4.4.0
collected 3 items                                                              [0m

tests/test_etl.py [32m.[0m[32m.[0m[32m.[0m[32m                                                    [100%][0m



Add a Github Actions workflow for CI/CD testing

In [25]:
# Make .github/workflows directory
if not os.path.exists('.github/workflows'):
    os.makedirs('.github/workflows')

In [26]:
%%writefile .github/workflows/ci.yml

name: CI

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    
    services:
      spark:
        image: apache/spark:latest
        ports:
          - 4040:4040
        options: --memory=4g --cpus=2

    steps:
    - name: Checkout code
      uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    - name: Run PySpark tests
      run: |
        pytest tests/test_etl.py

  deploy:
    runs-on: ubuntu-latest
    needs: test  # This ensures the deployment only runs if the test job succeeds

    steps:
    - name: Checkout code
      uses: actions/checkout@v2

    - name: Deploy application
      run: |
        # Add your deployment commands here
        echo "Deploying application..."


Writing .github/workflows/ci.yml


Generate requirements.txt

In [27]:
# Generate requirements.txt
with open('requirements.txt', 'w') as f:
    f.write('pyspark\n')
    f.write('apache-airflow\n')
    f.write('dag-factory\n')
    f.write('pytest\n')

Make .gitignore file

In [28]:
%%writefile .gitignore

# Ignore Python bytecode files
.pytest_cache
.venv
**/__pycache__

Writing .gitignore


Push to repository and watch the jobs run in Github Actions.

Finished, congratulations!