# ETL Job for Diagnosis Dataset Production
This is a simple project I worked on to produce a dataset with diagnosis data by fetching the required data from database tables in Amazon Redshift.

**Problem**: since the database tables contained a large number of records, querying the database with redshift-connector library was taking too much time.

**Solution**: introduced Spark in the analysis environment used by the business stakeholders.

**Note**: since this was an actual business application, I'll report only part of the project I worked on, hiding all sensitive information.

The following was the Python script used to retrieve data from the database tables and produce a dataset with the obtained dataframes.

In [None]:
from pyspark.sql import SparkSession, SQLContext
import argparse

def main():
    parser = argparse.ArgumentParser(description="app inputs")
    parser.add_argument("--dbname", type=str, help="database name")
    parser.add_argument("--schema", type=str, help="redshift schema")
    parser.add_argument("--host", type=str, help="redshift database host")
    parser.add_argument("--port", type=str, help="redshift port")
    parser.add_argument("--username", type=str, help="redshift username")
    parser.add_argument("--password", type=str, help="redshift password")
    parser.add_argument("--tempdir", type=str, help="s3 temp bucket uri")
    parser.add_argument("--driver", type=str, help="jdbc driver")
    parser.add_argument("--dependencies", type=str, help="dependencies for Spark-Redshift connection")
    parser.add_argument("--role", type=str, help="AWS IAM role")

    spark = SparkSession.builder \
        .enableHiveSupport() \
        .config("spark.jars", args.driver) \
        .config("spark.jars.packages", args.dependencies) \
        .getOrCreate()
    sql_context = SQLContext(spark.sparkContext)

    # URL composition
    url = f"jdbc:redshift://{args.host}:{args.port}/{args.dbname}?user={args.username}&password={args.password}"

    # Spark-Redshift connection format
    connection_format = "io.github.spark_redshift_community.spark.redshift"

    # First DB query
    query = f"""
    SELECT
        d.member_id,
        d.date_visit,
        m.level1_code,
        m.level2_code
    FROM {args.schema}.diagnosis AS d
    INNER JOIN {args.schema}.diagnosis_master AS m
    ON d.diagnosis_code = m.diagnosis_code
    WHERE m.level1_code in ('ABC', 'DEF', 'GHI')
    ORDER BY
        d.member_id,
        d.date_visit,
        m.level1_code,
        m.level2_code
    """
    diagnosis_df = sql_context.read \
        .format(connection_format) \
        .option("url", url) \
        .option("query", query) \
        .option("tempdir", args.tempdir) \
        .option("aws_iam_role", args.role) \
        .load()

    # Second DB query
    query = f"""
        SELECT
            member_id,
            date_checkup,
            bmi
        FROM {args.schema} health_checkup
        WHERE bmi >= 27
    """
    bmi_df = sql_context.read \
        .format(connection_format) \
        .option("url", url) \
        .option("query", query) \
        .option("tempdir", args.tempdir) \
        .option("aws_iam_role", args.role) \
        .load()

    # Dataset creation
    bmi_diagnosis = diagnosis_df.join(
        bmi_df,
        diagnosis_df["member_id"] == bmi_df["member_id"],
        "left"
    )

    # Rest of code
    # ...

if __name__ == "__main__":
    main()