In [1]:
from dask_cloudprovider.aws import EC2Cluster 
from dask.distributed import Client
import configparser
import os
import contextlib
import re
import dask
from platform import python_version

In [2]:
# We need this function to get the AWS credentials from the config file to pass to the EC2Cluster

def get_aws_credentials():
    """Read in your AWS credentials file and convert to environment variables."""
    parser = configparser.RawConfigParser()
    
    parser.read(os.path.expanduser('~/.aws/config'))
    config = parser.items('default')
    
    parser.read(os.path.expanduser('~/.aws/credentials'))
    credentials = parser.items('default')
    
    all_credentials = {key.upper(): value for key, value in [*config, *credentials]}
    with contextlib.suppress(KeyError):
        all_credentials["AWS_REGION"] = all_credentials.pop("REGION")
        
    return all_credentials

In [3]:
# Pass in AWS Credentials + any extra packages you would like to install on cluster via `pip`
env_vars = get_aws_credentials()
env_vars["EXTRA_PIP_PACKAGES"] = "s3fs" # we are going to install s3fs library on the cluster

# Select software installed on scheduler + worker instances based on client Python + Dask versions
# versions need to match across client, scheduler, worker -- slight mismatches are OK, though
py_v = '-py' + re.findall(r'\d{1}.\d+', python_version())[0] # find the python version
dask_docker_tag = f"daskdev/dask:{dask.__version__ + py_v}" # get the dask docker tag that matches the client version
print('Docker Image: ', dask_docker_tag)

Docker Image:  daskdev/dask:2024.5.0-py3.11


In [31]:
# launch a cluster of 5 r5.large instances (10 vCPUs):
# 1 scheduler
# 4 workers (2 threads + 16 GB RAM each)
cluster = EC2Cluster(instance_type='r5.large',
                     n_workers=4,
                     security=False,
                     docker_image=dask_docker_tag,
                     env_vars=env_vars
)

Creating scheduler instance
Created instance i-0ce84e354d3460bc1 as dask-d2470c67-scheduler
Waiting for scheduler to run at 54.234.202.80:8786
Scheduler is running
Creating worker instance
Creating worker instance
Creating worker instance
Creating worker instance
Created instance i-05646e665b40119b9 as dask-d2470c67-worker-e0081726
Created instance i-0dd124b857278fdb5 as dask-d2470c67-worker-922f707f
Created instance i-04e5bc49ddb0af8bd as dask-d2470c67-worker-591b21c2
Created instance i-0c00c25537164b897 as dask-d2470c67-worker-9e7bf11d


In [40]:
client = Client(cluster)
client


+-------------+----------------+----------------+----------------+
| Package     | Client         | Scheduler      | Workers        |
+-------------+----------------+----------------+----------------+
| cloudpickle | 2.2.1          | 3.0.0          | 3.0.0          |
| distributed | 2023.6.0       | 2024.5.0       | 2024.5.0       |
| lz4         | 4.3.2          | 4.3.3          | 4.3.3          |
| msgpack     | 1.0.3          | 1.0.7          | 1.0.7          |
| numpy       | 1.24.3         | 1.26.4         | 1.26.4         |
| pandas      | 2.0.3          | 2.2.2          | 2.2.2          |
| python      | 3.11.5.final.0 | 3.11.9.final.0 | 3.11.9.final.0 |
| tornado     | 6.3.2          | 6.4            | 6.4            |
+-------------+----------------+----------------+----------------+


0,1
Connection method: Cluster object,Cluster type: dask_cloudprovider.EC2Cluster
Dashboard: http://54.234.202.80:8787/status,

0,1
Dashboard: http://54.234.202.80:8787/status,Workers: 4
Total threads: 8,Total memory: 61.30 GiB

0,1
Comm: tcp://172.31.35.189:8786,Workers: 4
Dashboard: http://172.31.35.189:8787/status,Total threads: 8
Started: 3 minutes ago,Total memory: 61.30 GiB

0,1
Comm: tcp://172.31.38.237:34209,Total threads: 2
Dashboard: http://172.31.38.237:42799/status,Memory: 15.33 GiB
Nanny: tcp://172.31.38.237:40835,
Local directory: /tmp/dask-scratch-space/worker-8_089x6q,Local directory: /tmp/dask-scratch-space/worker-8_089x6q

0,1
Comm: tcp://172.31.44.85:38249,Total threads: 2
Dashboard: http://172.31.44.85:45523/status,Memory: 15.33 GiB
Nanny: tcp://172.31.44.85:41897,
Local directory: /tmp/dask-scratch-space/worker-lzga52as,Local directory: /tmp/dask-scratch-space/worker-lzga52as

0,1
Comm: tcp://172.31.37.166:46127,Total threads: 2
Dashboard: http://172.31.37.166:44413/status,Memory: 15.33 GiB
Nanny: tcp://172.31.37.166:33491,
Local directory: /tmp/dask-scratch-space/worker-o3kke1f5,Local directory: /tmp/dask-scratch-space/worker-o3kke1f5

0,1
Comm: tcp://172.31.33.95:41595,Total threads: 2
Dashboard: http://172.31.33.95:43989/status,Memory: 15.33 GiB
Nanny: tcp://172.31.33.95:34055,
Local directory: /tmp/dask-scratch-space/worker-1szltjre,Local directory: /tmp/dask-scratch-space/worker-1szltjre


In [42]:
import boto3

username = 'username'
password = 'password'
rds = boto3.client('rds')
rds.get_waiter('db_instance_available') \
   .wait(DBInstanceIdentifier='final-instance')
ddb = rds.describe_db_instances()['DBInstances'][0]
host = ddb['Endpoint']['Address']
port = ddb['Endpoint']['Port']
database = 'finalproject' 

In [43]:
db = rds.describe_db_instances()['DBInstances'][0]
ENDPOINT = db['Endpoint']['Address']
PORT = db['Endpoint']['Port']
DBID = db['DBInstanceIdentifier']

import mysql.connector

# Connect to the MySQL database
conn =  mysql.connector.connect(host=ENDPOINT, 
                                user="username", 
                                passwd="password", 
                                port=PORT, 
                                database="finalproject")
cur = conn.cursor()

In [41]:
import dask.array as da

n = len(client.scheduler_info()['workers'])
a = da.ones(100, chunks=int(100/n))
a

Unnamed: 0,Array,Chunk
Bytes,800 B,200 B
Shape,"(100,)","(25,)"
Dask graph,4 chunks in 1 graph layer,4 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 800 B 200 B Shape (100,) (25,) Dask graph 4 chunks in 1 graph layer Data type float64 numpy.ndarray",100  1,

Unnamed: 0,Array,Chunk
Bytes,800 B,200 B
Shape,"(100,)","(25,)"
Dask graph,4 chunks in 1 graph layer,4 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [23]:
import pandas as pd
import dask.dataframe as dd

In [44]:
cur.execute("SELECT * FROM simulation_data LIMIT 10000")

# Fetch the results
data = cur.fetchall()
print(len(data))
column_names = [i[0] for i in cur.description]
pdf = pd.DataFrame(data, columns=column_names)

cur.close()
conn.close()

ddf = dd.from_pandas(pdf, npartitions=5)

3800


In [45]:
ddf.head()

Task exception was never retrieved
future: <Task finished name='Task-13476' coro=<Client._gather.<locals>.wait() done, defined at /Users/yingrongmao/anaconda3/lib/python3.11/site-packages/distributed/client.py:2196> exception=AllExit()>
Traceback (most recent call last):
  File "/Users/yingrongmao/anaconda3/lib/python3.11/site-packages/distributed/client.py", line 2205, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-15650' coro=<Client._gather.<locals>.wait() done, defined at /Users/yingrongmao/anaconda3/lib/python3.11/site-packages/distributed/client.py:2196> exception=AllExit()>
Traceback (most recent call last):
  File "/Users/yingrongmao/anaconda3/lib/python3.11/site-packages/distributed/client.py", line 2205, in wait
    raise AllExit()
distributed.client.AllExit


KeyboardInterrupt: 

In [47]:
result = ddf[["RunId", "num_nodes"]].groupby('num_nodes').mean().compute()
print(result)

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


KeyboardInterrupt: 

In [16]:
from sqlalchemy import create_engine
# Create a connection string
connection_string = f'postgresql://{username}:{password}@{host}:{port}/{database}'

# Create an engine
engine = create_engine(connection_string)

In [None]:
import psycopg2

conn = psycopg2.connect(f"dbname='finalproject' user='username' host={host} password='password' sslmode='require'")
cursor = conn.cursor()
cursor.execute("SELECT 1")
print(cursor.fetchone())
conn.close()


In [21]:
pdf = pd.read_sql('SELECT * FROM simulation_data', engine)

OperationalError: (psycopg2.OperationalError) received invalid response to SSL negotiation: J

(Background on this error at: https://sqlalche.me/e/14/e3q8)

In [20]:
import pandas as pd
import dask.dataframe as dd

# Use Pandas to read data
pdf = pd.read_sql('SELECT * FROM simulation_data', engine)

# Convert the Pandas DataFrame to a Dask DataFrame
ddf = dd.from_pandas(pdf, npartitions=7)


OperationalError: (psycopg2.OperationalError) received invalid response to SSL negotiation: J

(Background on this error at: https://sqlalche.me/e/14/e3q8)