# Query Optimization 

Exercise to use PostgreSQL schema and tables contained in [data.dmp](/query_optimization/data.dmp?raw=true) to summarize some (E)nvironmental, (S)ocial, (G)overnance, and Total Impact scores for fictitious entities listed on the S&P 500. Your requirements are as follows:
* Determine the most efficient method of joining the included `sp500`, `id_map`, and `esg_scores` tables
* Create a new `sp500_esg_scores` table that lists all available id, name, and score columns for the S&P 500 constituent entities
* Add a `rank` column to the new `sp500_esg_scores` table that ranks the S&P 500 constituent entities by percentile on `total_score` in ascending order
* Add a MEDIAN row to the `sp500_esg_scores` table that shows the median value for each score column across the S&P 500 constituents 
* Make sure that where S&P 500 constituent entities are missing score values they still appear in the `sp500_esg_scores` table and are ranked in the 0 percentile
* Suggest a database and ETL architecture if there were a much larger universe of companies eg. 50,000. This universe would be updated on a weekly basis and rankings would need to be recomputed upon update.

Outputs a `esg_analysis.dmp` file with the database code to replicate your solution as well as a `query.sql` file containing the SQL query used to produce the `sp500_esg_scores` table.

In [32]:
!python --version


Python 3.10.9


In [33]:
!pip list

Package              Version
-------------------- -----------
anyio                3.5.0
appnope              0.1.2
argon2-cffi          21.3.0
argon2-cffi-bindings 21.2.0
asttokens            2.0.5
attrs                22.1.0
Babel                2.11.0
backcall             0.2.0
beautifulsoup4       4.11.1
bleach               4.1.0
brotlipy             0.7.0
certifi              2022.12.7
cffi                 1.15.1
charset-normalizer   2.0.4
comm                 0.1.2
cryptography         38.0.4
debugpy              1.5.1
decorator            5.1.1
defusedxml           0.7.1
entrypoints          0.4
executing            0.8.3
fastjsonschema       2.16.2
flit_core            3.6.0
idna                 3.4
ipykernel            6.19.2
ipython              8.10.0
ipython-genutils     0.2.0
jedi                 0.18.1
Jinja2               3.1.2
json5                0.9.6
jsonschema           4.17.3
jupyter_client       7.4.9
jupyter_core         5.2.0
jupyter-server       1.23.4
jupyter

# Initiate PostgreSQL 

Unless already created, connect to Database yvesdb

In [34]:
import psycopg2

In [35]:
try:
    conn = psycopg2.connect("dbname=postgres")
except:
    print("Postgres error opening main db")

In [36]:
conn.autocommit = True
cur = conn.cursor()

In [37]:
try:
    cur.execute("CREATE DATABASE yvesblue")
except:
    print('DB yvesblue already exists!')

conn = psycopg2.connect("dbname=yvesblue")
cur = conn.cursor()

DB yvesblue already exists!


In [38]:
conn.commit()

# Create Table for "SP500"
instr_id text

In [12]:
try:
    cur.execute("CREATE TABLE sp500(\
                        instr_id TEXT PRIMARY KEY \
                                    )" \
               ) 
except:
    print('Table sp500 either already exists or there is another error')

conn.commit()

Table sp500 either already exists or there is another error


In [13]:
cur.execute('SELECT * from "sp500"')

In [41]:
# print(cur.fetchall()) # for debug

In [15]:
#drop for testing
#cur.execute('DROP TABLE IF EXISTS "sp500" CASCADE;')  
#conn.commit()

# Create Table for "ID_MAP"
  id serial,
  instr_id text,
  name text

In [16]:
try:
    cur.execute("CREATE TABLE id_map(\
                        id SERIAL PRIMARY KEY, \
                        instr_id TEXT NOT NULL, \
                        name TEXT NOT NULL \
                                    )" \
               ) 
except:
    print('Table id_map either already exists or there is another error')

conn.commit()

Table id_map either already exists or there is another error


In [17]:
cur.execute('SELECT * from "id_map"')

In [42]:
# print(cur.fetchall())  # for debug

In [19]:
#drop for testing
#cur.execute('DROP TABLE IF EXISTS "id_map" CASCADE;')  


# Create Table for "ESG_SCORES"
 
  id serial,
  total_score numeric(15,6),
  e_score numeric(15,6),
  s_score numeric(15,6),
  g_score numeric(15,6)

In [20]:
try:
    cur.execute("CREATE TABLE esg_scores(\
                        id SERIAL PRIMARY KEY, \
                        total_score FLOAT NOT NULL, \
                        e_score FLOAT NOT NULL, \
                        s_score FLOAT NOT NULL, \
                        g_score FLOAT NOT NULL \
                                    )" \
               ) 
except:
    print('Table esg_scores either already exists or there is another error')

conn.commit()

Table esg_scores either already exists or there is another error


In [21]:
cur.execute('SELECT * from "esg_scores"')

In [43]:
# print(cur.fetchall()) # for debug

In [23]:
#drop for testing
#cur.execute('DROP TABLE IF EXISTS "esg_scores" CASCADE;')  

# Combining the tables

SQL Schematic

SQL code to build the 'sp500_esg_scores' dataset
```
with sp500_mapped as
(
	select 
		id_map.id as id,
		sp500.instr_id as instr_id,
		id_map.name as name
	from sp500
	join id_map 
	on sp500.instr_id = id_map.instr_id
	order by sp500.instr_id asc
)
select 
	sp500_mapped.id as id,
	sp500_mapped.name as name,
	esg_scores.total_score as total_score,
	esg_scores.e_score as e_score,
	esg_scores.s_score as s_score,
	esg_scores.g_score as g_score,
	PERCENT_RANK() OVER(ORDER BY esg_scores.total_score DESC) as total_score_percentile_rank
from sp500_mapped
LEFT JOIN esg_scores
ON sp500_mapped.id = esg_scores.id 
ORDER BY total_score_percentile_rank asc
```



SQL code to build the MEDIAN row 
```
with sp500_with_scores as ( 
	with sp500_mapped as
	(
		select 
			id_map.id as id,
			sp500.instr_id as instr_id,
			id_map.name as name
		from sp500
		join id_map 
		on sp500.instr_id = id_map.instr_id
		order by sp500.instr_id asc
	)
	select 
		sp500_mapped.id as id,
		sp500_mapped.name as name,
		esg_scores.total_score as total_score,
		esg_scores.e_score as e_score,
		esg_scores.s_score as s_score,
		esg_scores.g_score as g_score,
		PERCENT_RANK() OVER(ORDER BY esg_scores.total_score DESC) as total_score_percentile_rank
	from sp500_mapped
	LEFT JOIN esg_scores
	ON sp500_mapped.id = esg_scores.id 
	ORDER BY total_score_percentile_rank asc
),
total_score_median as (
	select 
		1 as idx,
		PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_score) AS median_total_score
	from sp500_with_scores
),
e_score_median as (
	select 
		1 as idx,
		PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY e_score) AS median_e_score
	from sp500_with_scores
),
s_score_median as (
	select 
		1 as idx,
		PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY s_score) AS median_s_score
	from sp500_with_scores
),
g_score_median as (
	select 
		1 as idx,
		PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY g_score) AS median_g_score
	from sp500_with_scores
)
select 
	9999 as id,
	'**** MEDIAN ****' as name,
	total_score_median.median_total_score, 
	e_score_median.median_e_score,
	s_score_median.median_s_score,
	g_score_median.median_g_score,
	0.5000000000000001 as total_score_percentile_rank
from total_score_median
join e_score_median
on total_score_median.idx = e_score_median.idx
join s_score_median
on total_score_median.idx = s_score_median.idx
join g_score_median
on total_score_median.idx = g_score_median.idx
```

References:

[Summary Stats using Postgres by Muralidhar](https://towardsdatascience.com/how-to-derive-summary-statistics-using-postgresql-742f3cdc0f44)

[Fastest Way to Calculate Median by Bertrand](https://sqlperformance.com/2012/08/t-sql-queries/median)

# Create Table for "sp500_esg_scores"

With the following columns - id, name, total_score, e_score, s_score, g_score, rank

In [40]:
conn

<connection object at 0x10b36d3f0; dsn: 'dbname=yvesblue', closed: 0>

In [87]:
try:
    cur.execute("CREATE TABLE sp500_esg_scores(\
                        id SERIAL PRIMARY KEY, \
                        name TEXT NOT NULL, \
                        total_score FLOAT, \
                        e_score FLOAT, \
                        s_score FLOAT, \
                        g_score FLOAT, \
                        rank FLOAT \
                                    )" \
               ) 
except:
    print('Table sp500_esg_scores either already exists or there is another error')

conn.commit()

In [88]:
cur.execute('SELECT * from "sp500_esg_scores"')

In [89]:
print(cur.fetchall())

[]


# Insert into sp500_esg_scores

In [90]:
# test query
try:
    cur.execute("with sp500_mapped as \
                    ( \
                        select \
                            id_map.id as id, \
                            sp500.instr_id as instr_id, \
                            id_map.name as name \
                        from sp500 \
                        join id_map \
                        on sp500.instr_id = id_map.instr_id \
                        order by sp500.instr_id asc \
                    ) \
                    select \
                        sp500_mapped.id as id, \
                        sp500_mapped.name as name, \
                        esg_scores.total_score as total_score, \
                        esg_scores.e_score as e_score, \
                        esg_scores.s_score as s_score, \
                        esg_scores.g_score as g_score, \
                        PERCENT_RANK() OVER(ORDER BY esg_scores.total_score DESC) as rank \
                    from sp500_mapped \
                    LEFT JOIN esg_scores \
                    ON sp500_mapped.id = esg_scores.id  \
                    ORDER BY rank asc" \
               )
except:
    print('Problem querying into sp500_mapped')

conn.commit()

In [84]:
# print(cur.fetchall())  # for debug only

In [86]:
conn.commit()

In [91]:
try:
    cur.execute("WITH sp500_esg_scores_values AS \
                (\
                    with sp500_mapped as \
                    ( \
                        select \
                            id_map.id as id, \
                            sp500.instr_id as instr_id, \
                            id_map.name as name \
                        from sp500 \
                        join id_map \
                        on sp500.instr_id = id_map.instr_id \
                        order by sp500.instr_id asc \
                    ) \
                    select \
                        sp500_mapped.id as id, \
                        sp500_mapped.name as name, \
                        esg_scores.total_score as total_score, \
                        esg_scores.e_score as e_score, \
                        esg_scores.s_score as s_score, \
                        esg_scores.g_score as g_score, \
                        PERCENT_RANK() OVER(ORDER BY esg_scores.total_score DESC) as rank \
                    from sp500_mapped \
                    LEFT JOIN esg_scores \
                    ON sp500_mapped.id = esg_scores.id  \
                    ORDER BY rank asc \
                )\
                INSERT into sp500_esg_scores(id, name, total_score, e_score, s_score, g_score, rank) \
                SELECT id, name, total_score, e_score, s_score, g_score, rank from sp500_esg_scores_values" \
                )
except:
    print('Problem inserting into sp500_esg_scores')

conn.commit()

In [92]:
cur.execute('SELECT * from "sp500_esg_scores"')

In [94]:
# print(cur.fetchall()) # debug only

# Insert Median Row

In [None]:
try:
    cur.execute("WITH sp500_esg_scores_medians AS \
                (\
                    with sp500_with_scores as \
                    ( \ 
                        with sp500_mapped as \
                        (\
                            select \ 
                                id_map.id as id, \
                                sp500.instr_id as instr_id, \
                                id_map.name as name \
                            from sp500 \
                            join id_map \
                            on sp500.instr_id = id_map.instr_id \
                            order by sp500.instr_id asc \
                        )\
                    select  \
                        sp500_mapped.id as id, \
                        sp500_mapped.name as name, \
                        esg_scores.total_score as total_score, \
                        esg_scores.e_score as e_score, \
                        esg_scores.s_score as s_score, \
                        esg_scores.g_score as g_score, \
                        PERCENT_RANK() OVER(ORDER BY esg_scores.total_score DESC) as total_score_percentile_rank \
                    from sp500_mapped \
                    LEFT JOIN esg_scores \
                    ON sp500_mapped.id = esg_scores.id \
                    ORDER BY total_score_percentile_rank asc \
                    ),\
                    total_score_median as ( \
                        select \
                            1 as idx, \
                            PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_score) AS median_total_score \
                        from sp500_with_scores \
                    ), \
                    e_score_median as ( \
                        select \
                            1 as idx, \
                            PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY e_score) AS median_e_score \
                        from sp500_with_scores \
                    ), \
                    s_score_median as ( \
                        select \
                            1 as idx, \
                            PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY s_score) AS median_s_score \
                        from sp500_with_scores \
                    ), \
                    g_score_median as ( \
                        select \
                            1 as idx, \
                            PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY g_score) AS median_g_score \
                    from sp500_with_scores \
                    ) \
                    select \
                        9999 as id, \
                        '**** MEDIAN ****' as name, \
                        total_score_median.median_total_score as total_score, \
                        e_score_median.median_e_score as e_score, \
                        s_score_median.median_s_score as s_score, \
                        g_score_median.median_g_score as g_score, \
                        0.5000000000000001 as rank \
                    from total_score_median \
                    join e_score_median \
                        on total_score_median.idx = e_score_median.idx \
                    join s_score_median \
                        on total_score_median.idx = s_score_median.idx \
                    join g_score_median \
                        on total_score_median.idx = g_score_median.idx \
                )\
                INSERT into sp500_esg_scores(id, name, total_score, e_score, s_score, g_score, rank) \
                SELECT id, name, total_score, e_score, s_score, g_score, rank from sp500_esg_scores_medians" \
               )
except:
    print('Problem inserting medians into sp500_esg_scores')
conn.commit()

In [112]:
cur.execute('SELECT * from "sp500_esg_scores" where name = \'**** MEDIAN ****\'')

In [113]:
print(cur.fetchall())

[(9999, '**** MEDIAN ****', 53.015, 49.51, 45.54, 47.69, 0.5)]


# Closing Note

On the question on if there were a much larger universe of companies eg. 50,000, updated on a weekly basis and if rankings would need to be recomputed upon updates, I would offer for consideration:

1 - Partitioning the database by the update date of each week, which allows [among benefits](https://www.postgresql.org/docs/current/ddl-partitioning.html) potential increases in query performance, ability to retain newer and/or offload older aging/dated/dump partitions for the sake of saving compute/storage resources, and ease potential [Postgres administration issues](https://onesignal.com/blog/lessons-learned-from-5-years-of-scaling-postgresql/)

2 - Consideration of a more [modular ETL architecture](https://www.sqlshack.com/designing-a-modular-etl-architecture/) allowing ability for more loosely coupled pipelines but which requires careful pipeline orchestration and data quality checks to ensure data integrity.

3 - For aggregated metrics e.g. medians of the different scores by date partitions, consideration of separate dimension tables to avoid having to run compute expensive queries (ie storage is cheap). [Postgres could be used](https://www.narratordata.com/blog/using-postgresql-as-a-data-warehouse/) however there are other solutions in the market as well. Much of these would depend on cost vs performance tradeoffs.

# Screenshots

The following are screenshots from running queries on a local Postgres instance.

![image1](yves_query_test_1.png)

![image2](yves_query_test_2.png)

![image3](yves_query_test_3.png)

![image4](yves_query_test_4.png)

![image5](yves_query_test_5.png)