In [None]:
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from email import message

import requests
import json
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
import os
import smtplib

def conexion_redshift():
    url="data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
    data_base="data-engineer-database"
    user="diegoalonsotelloalva1998_coderhouse"
    pwd= 'KTs5N49gGd'

    try:
        conn = psycopg2.connect(
            host='data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com',
            dbname=data_base,
            user=user,
            password=pwd,
            port='5439'
        )
        print("Conectado a Redshift con éxito!")
    
    except Exception as e:
        print("No es posible conectar a Redshift")
    
    cur.execute("""
        CREATE TABLE IF NOT EXISTS diegoalonsotelloalva1998_coderhouse.bitcoins
        (
         compra_bitcon_pesoarg FLOAT
        ,comision_bitcon_pesoarg FLOAT 
        ,venta_bitcon_pesoarg FLOAT 
        ,compra_trxusdt_pesoarg FLOAT
        ,comision_trxusdt_pesoarg FLOAT
        ,venta_trxusdt_pesoarg FLOAT
        ,fecha_actualizacion VARCHAR(255)
        ,comision_retiro FLOAT
        )
    """)
    conn.commit()

def extraer_data_api():
    #Paso 1 Extraer la data

    data = []

    response = requests.get('https://www.bitmonedero.com/api/btc-ars')
    data_json = json.loads(response.text)
    data.append(data_json)

    #Paso 2 Crear el diccionario

    diccionario = {'compra_bitcon_pesoarg':[],'comision_bitcon_pesoarg':[],'venta_bitcon_pesoarg':[],'compra_trxusdt_pesoarg':[]
                   ,'comision_trxusdt_pesoarg':[],'venta_trxusdt_pesoarg':[],'fecha_actualizacion':[],'comision_retiro':[]}

    # compra_bitcon_pesoarg
    # comision_bitcon_pesoarg
    # venta_bitcon_pesoarg
    # compra_trxusdt_pesoarg
    # comision_trxusdt_pesoarg
    # venta_trxusdt_pesoarg
    # fecha_actualizacion
    # comision_retiro

    #Extraer cada campo de la data

    for i in data:
        compra_bitcon_pesoarg = i['buy_btc_ars']
        comision_bitcon_pesoarg= i['buy_btc_ars_fee']
        venta_bitcon_pesoarg= i['sell_btc_ars']
        compra_trxusdt_pesoarg= i['buy_trxusdt_ars']
        comision_trxusdt_pesoarg = i['buy_trxusdt_ars_fee']
        venta_trxusdt_pesoarg= i['sell_trxusdt_ars']
        fecha_actualizacion= i['updated_at_prices']
        comision_retiro= i['withdrawal_fee']

        diccionario['compra_bitcon_pesoarg'].append(compra_bitcon_pesoarg)
        diccionario['comision_bitcon_pesoarg'].append(comision_bitcon_pesoarg)
        diccionario['venta_bitcon_pesoarg'].append(venta_bitcon_pesoarg)
        diccionario['compra_trxusdt_pesoarg'].append(compra_trxusdt_pesoarg)
        diccionario['comision_trxusdt_pesoarg'].append(comision_trxusdt_pesoarg)
        diccionario['venta_trxusdt_pesoarg'].append(venta_trxusdt_pesoarg)
        diccionario['fecha_actualizacion'].append(fecha_actualizacion)
        diccionario['comision_retiro'].append(comision_retiro)

    df = pd.DataFrame(diccionario)
    display(df)

def insertando_data_api():
    from psycopg2.extras import execute_values
    with conn.cursor() as cur:
        execute_values(
            cur,
            '''
            INSERT INTO bitcoins (compra_bitcon_pesoarg,comision_bitcon_pesoarg,venta_bitcon_pesoarg,compra_trxusdt_pesoarg
            ,comision_trxusdt_pesoarg,venta_trxusdt_pesoarg,fecha_actualizacion,comision_retiro)
            VALUES %s
            ''',
            [tuple(row) for row in df.values],
            page_size=len(df)
        )
        conn.commit()
        
        cur.close()
        conn.close()

    user = Variable.get("SECRET_EMAIL")
    pwd_email = Variable.get("SECRET_PWD_EMAIL")

    try:
        x=smtplib.SMTP('smtp.gmail.com', 587)
        x.starttls()
        x.login(user, pwd_email)
        subject='Aviso - Pipeline completado'
        body_text='Los datos han sido actualizados.'
        message='Subject: {}\n\n{}'.format(subject, body_text)
        x.sendmail(user, 'jese_salazar@hotmail.com', message)
        print('Email enviado con éxito')
    except Exception as exception:
        print(exception)
        print('Falló el envío del email')
        
## TAREAS

default_args = {
    'start_date': datetime(2024, 4, 14),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}


ingestion_dag = DAG(
    dag_id='ingestion_data',
    default_args=default_args,
    description='Agrega datos de los tipos de cambio en bitcoins',
     schedule_interval=timedelta(days=1),
    catchup=False
)


task_1 = PythonOperator(
    task_id='conexion_redshift',
    python_callable=conexion_redshift,
    #op_args=["{{ ds }} {{ execution_date.hour }}"],
    dag=ingestion_dag,
)

task_2 = PythonOperator(
    task_id='extraer_data_api',
    python_callable=extraer_data_api,
    dag=ingestion_dag,
)

task_3 = PythonOperator(
    task_id='insertando_data_api',
    python_callable=insertando_data_api,
    dag=ingestion_dag,
)




task_1 >> task_2 >> task_3