In [None]:
import boto3
from datetime import datetime, timezone, timedelta
import botocore
import pandas as pd
import time
from sqlalchemy import create_engine, text
from sqlalchemy_utils import database_exists, create_database
import pandas as pd
from logging import basicConfig, info, INFO
import awswrangler as wr
import os
import pyspark
import json
basicConfig(level=INFO)


class MysqlConnection:

    def __init__(self, user: str, passwd: str, host: str, port: str = '3306') -> None:
        self.USER = user
        self.PASSWD = passwd
        self.HOST = host
        self.PORT = port
        self.SQLALCHEMY_DATABASE_URL = f"mysql+pymysql://{self.USER}:{self.PASSWD}@{self.HOST}:{self.PORT}/grupo4_sensores"

    def connect(self) -> None:
        info(f'Connecting to {self.HOST}...')
        self.engine = create_engine(
            self.SQLALCHEMY_DATABASE_URL, echo=False, pool_pre_ping=True)
        if not database_exists(self.engine.url):
            create_database(self.engine.url)
        info(f'Connected to {self.HOST} sucessfully!')

    def insert_dataframe(
            self,
            df: pd.DataFrame,
            table: str,
            schema: str,
            if_exists: str = 'append',
            index: bool = False
    ) -> None:
        info(
            f'Inserting dataframe into table={table} and schema={schema}...')
        df.to_sql(name=table, schema=schema, con=self.engine,
                  if_exists=if_exists, index=index, )
        info(
            f'Inserted dataframe into table={table} and schema={schema} sucessfully!')

    def read_sql(self, query: str) -> pd.DataFrame:
        info('Reading sql...')
        df = pd.read_sql_query(sql=text(query), con=self.engine.connect())
        return df

    def disconnect(self) -> None:
        info(f'Disconnecting database...')
        self.engine.dispose()
        info(f'Disconnected database sucessfully!')


while True:
    today = datetime.now(timezone.utc)
    def get_last_modified(obj): return int(obj['LastModified'].strftime('%s'))

    s3 = boto3.client('s3')
    objs = s3.list_objects_v2(Bucket='stagged-sprint-3')['Contents']
    last_added = [obj['Key']
                  for obj in sorted(objs, key=get_last_modified)][-1]
    BUCKET_NAME = 'stagged-sprint-3'  # replace with your bucket name
    KEY = last_added  # replace with your object key
    s3 = boto3.resource('s3')
    try:
        s3.Bucket(BUCKET_NAME).download_file(KEY, last_added)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise

    with open(last_added, 'r') as f:
        file = f.read()

    info_df = json.loads(file)
    date = datetime.now()
    for obj in info_df:
        date += timedelta(minutes=5)
        obj.update({"data_hora": (date).strftime('%d/%m/%y %H:%M:%S')})

    df = pd.DataFrame(info_df)
    os.remove(last_added)
    last_added = last_added.replace(".json", ".csv")
    df.to_csv(last_added, index=False)
    s3.meta.client.upload_file(last_added, 'consumed-sprint-3', last_added)

    mysql_connection = MysqlConnection(
        user="soybean", passwd="root#2023", host="bd-soybean.mysql.database.azure.com")

    mysql_connection.connect()
    mysql_connection.insert_dataframe(
        df, table="sensores", schema="grupo4_sensores")
    mysql_connection.disconnect()

    time.sleep(15)


INFO:root:Connecting to bd-soybean.mysql.database.azure.com...
INFO:root:Connected to bd-soybean.mysql.database.azure.com sucessfully!
INFO:root:Inserting dataframe into table=sensores and schema=grupo4_sensores...
INFO:root:Inserted dataframe into table=sensores and schema=grupo4_sensores sucessfully!
INFO:root:Disconnecting database...
INFO:root:Disconnected database sucessfully!
INFO:root:Connecting to bd-soybean.mysql.database.azure.com...
INFO:root:Connected to bd-soybean.mysql.database.azure.com sucessfully!
INFO:root:Inserting dataframe into table=sensores and schema=grupo4_sensores...
INFO:root:Inserted dataframe into table=sensores and schema=grupo4_sensores sucessfully!
INFO:root:Disconnecting database...
INFO:root:Disconnected database sucessfully!
INFO:root:Connecting to bd-soybean.mysql.database.azure.com...
INFO:root:Connected to bd-soybean.mysql.database.azure.com sucessfully!
INFO:root:Inserting dataframe into table=sensores and schema=grupo4_sensores...
INFO:root:Inser