In [40]:
import os

from botocore.exceptions import ClientError
from sklearn.datasets import load_iris
from sqlalchemy import create_engine
from sqlalchemy import text

import boto3
import pandas as pd

In [None]:
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_PORT = os.getenv('DB_PORT')      # можно посмотреть в GUI Yandex Cloud
SSL_PATH = os.getenv('SSL_PATH')    # путь к сертификату root.crt в файловой системе, лучше абсолютный путь без ~

S3_ENDPOINT_URL = os.getenv('S3_ENDPOINT_URL')
S3_ACCESS_KEY = os.getenv('S3_ACCESS')
S3_SECRET_KEY = os.getenv('S3_SECRET')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')

In [3]:
S3_RESOURCE = boto3.resource(
    "s3",
    endpoint_url=S3_ENDPOINT_URL,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
)


def upload_file_to_s3(file_path, bucket_name, object_name, s3_resource=S3_RESOURCE):
    if object_name is None:
        object_name = file_path.split('/')[-1]

    try:
        s3_resource.Bucket(bucket_name).upload_file(file_path, object_name)
        return True

    except ClientError as e:
        return False


def download_file_from_s3(bucket_name, object_name, file_path, s3_resource=S3_RESOURCE):
    try:
        s3_resource.Bucket(bucket_name).download_file(object_name, file_path)
        return True
    except ClientError as e:
        return False

### Загружаем данные в S3 в формате parquet

In [4]:
iris = load_iris()

df = pd.DataFrame(
    data=iris["data"],
    columns=["sepal_length", "sepal_width", "petal_length", "petal_width"],
)

df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
0,5.1,3.5,1.4,0.2
1,4.9,3.0,1.4,0.2
2,4.7,3.2,1.3,0.2
3,4.6,3.1,1.5,0.2
4,5.0,3.6,1.4,0.2


In [5]:
df.to_parquet("iris.parquet", index=False)

In [6]:
!ls -l

total 104
drwxr-xr-x 3 evgenii evgenii  4096 Jan  9 15:22 infra
-rw-r--r-- 1 evgenii evgenii  4350 Jan  9 16:01 iris.parquet
-rw-r--r-- 1 evgenii evgenii  4350 Jan  9 15:55 iris_s3.parquet
-rw-r--r-- 1 evgenii evgenii  1421 Jan  9 01:59 notes.txt
-rw-r--r-- 1 evgenii evgenii   167 Jan  9 02:00 requirements.txt
-rw-r--r-- 1 evgenii evgenii 71688 Jan  9 15:57 s3_db.ipynb
drwxr-xr-x 7 evgenii evgenii  4096 Jan  9 02:02 venv-s3-mysql


In [7]:
upload_file_to_s3(
    file_path="iris.parquet",
    bucket_name=S3_BUCKET_NAME,
    object_name="iris.parquet"
)

True

### Загружаем данные из S3 (EXTRACT)

In [8]:
download_file_from_s3(
    bucket_name=S3_BUCKET_NAME,
    object_name="iris.parquet",
    file_path="iris_s3.parquet"
)

df = pd.read_parquet("iris_s3.parquet")

In [9]:
df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
0,5.1,3.5,1.4,0.2
1,4.9,3.0,1.4,0.2
2,4.7,3.2,1.3,0.2
3,4.6,3.1,1.5,0.2
4,5.0,3.6,1.4,0.2


### Преобразуем данные (TRANSFORM)

In [10]:
def dummy_model(df):
    df['prediction'] = df.mean(axis=1).astype(int)

In [11]:
dummy_model(df)
df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,prediction
0,5.1,3.5,1.4,0.2,2
1,4.9,3.0,1.4,0.2,2
2,4.7,3.2,1.3,0.2,2
3,4.6,3.1,1.5,0.2,2
4,5.0,3.6,1.4,0.2,2


In [12]:
df['prediction'].unique()

array([2, 3, 4, 5])

### Загружаем данные в БД MySQL (LOAD)

In [None]:
SSL_ARGS = {"ssl_ca": SSL_PATH}
CONNECTION_URL_MYSQL = f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
ENGINE_MYSQL = create_engine(CONNECTION_URL_MYSQL, connect_args=SSL_ARGS)

In [27]:
def execute(query, con, params=None):
    with con.connect() as connection:
        with connection.begin():
            if params:
                connection.execute(query, params)
            else:
                connection.execute(query)

In [34]:
# создаём таблицу в MySQL
create_table_query = """
    CREATE TABLE IF NOT EXISTS iris_predictions (
        id INT AUTO_INCREMENT PRIMARY KEY,
        sepal_length FLOAT,
        sepal_width FLOAT,
        petal_length FLOAT,
        petal_width FLOAT,
        prediction INT
    )
"""

execute(create_table_query, con=ENGINE_MYSQL)

In [None]:
# загружаем в неё данные
insert_query = """
INSERT INTO iris_predictions (
    sepal_length, 
    sepal_width, 
    petal_length, 
    petal_width, 
    prediction
)
VALUES (%s, %s, %s, %s, %s)
"""

total_rows = df.shape[0]

try:
    for i in range(0, total_rows, 1000):
        batch = df.iloc[i : i + 1000]
        values = [
            (
                row["sepal_length"],
                row["sepal_width"],
                row["petal_length"],
                row["petal_width"],
                row["prediction"],
            )
            for _, row in batch.iterrows()
        ]
        execute(insert_query, ENGINE_MYSQL, values)

except Exception as e:
    print(f"An error occurred while inserting predictions: {e}")

In [None]:
# выгружаем из неё данные
def fetch_all(query, con):
    with con.connect() as connection:
        result = connection.execute(text(query))
        return result.fetchall()


select_query = "SELECT * FROM iris_predictions"
rows = fetch_all(select_query, con=ENGINE_MYSQL)

for i, row in enumerate(rows):
    if i == 5:
        break
    print(row)

(1, 5.1, 3.5, 1.4, 0.2, 2)
(2, 4.9, 3.0, 1.4, 0.2, 2)
(3, 4.7, 3.2, 1.3, 0.2, 2)
(4, 4.6, 3.1, 1.5, 0.2, 2)
(5, 5.0, 3.6, 1.4, 0.2, 2)
