In [None]:
!pip install -U apache-airflow
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


Collecting apache-airflow
  Downloading apache_airflow-2.3.2-py3-none-any.whl (5.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.2/5.2 MB[0m [31m728.2 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h

In [None]:


def Get_DF_i(Day):
        import pandas as pd
        DF_i=None
        try: 
            URL_Day=f'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/{Day}.csv'
            DF_day=pd.read_csv(URL_Day)
            DF_day['Day']=Day
            cond=(DF_day.Country_Region=='United Kingdom')
            Selec_columns=['Day','Country_Region', 'Last_Update',
                  'Lat', 'Long_', 'Confirmed', 'Deaths', 'Recovered', 'Active',
                  'Combined_Key', 'Incident_Rate', 'Case_Fatality_Ratio']
            DF_i=DF_day[cond][Selec_columns].reset_index(drop=True)
        except:
            print(f'{Day} is not available!')
            pass
        return DF_i

def Create_data():
        import pandas as pd
        List_of_Days=[]
        import datetime
        for i in range(1,145):
                Previous_Date = datetime.datetime.today() - datetime.timedelta(days=145-i)
                if (Previous_Date.day >9):
                    if (Previous_Date.month >9):
                          List_of_Days.append(f'{Previous_Date.month}-{Previous_Date.day}-{Previous_Date.year}')
                    else:
                          List_of_Days.append(f'0{Previous_Date.month}-{Previous_Date.day}-{Previous_Date.year}')
                else:
                    if (Previous_Date.month >9):
                          List_of_Days.append(f'{Previous_Date.month}-0{Previous_Date.day}-{Previous_Date.year}')
                    else:
                          List_of_Days.append(f'0{Previous_Date.month}-0{Previous_Date.day}-{Previous_Date.year}')
            
        DF_all=[]
        for Day in List_of_Days:
            DF_all.append(Get_DF_i(Day))
            
        DF_UK=pd.concat(DF_all).reset_index(drop=True)
        DF_UK['Last_Update']=pd.to_datetime(DF_UK.Last_Update, infer_datetime_format=True)  
        DF_UK['Day']=pd.to_datetime(DF_UK.Day, infer_datetime_format=True)  
        DF_UK['Case_Fatality_Ratio']=DF_UK['Case_Fatality_Ratio'].astype(float)
        DF_UK.set_index('Day', inplace=True)
        DF_UK.to_csv('/opt/airflow/data/DF_UK.csv')

def MinMaxScaler():
        import pandas as pd
        import sklearn
        from sklearn.preprocessing import MinMaxScaler
        DF_UK = pd.read_csv('/opt/airflow/data/DF_UK.csv', parse_dates=['Last_Update'])
        DF_UK['Day']=DF_UK.Day
        DF_UK.set_index('Day', inplace=True)
        min_max_scaler = MinMaxScaler()
        DF_UK_u=DF_UK.copy()
        Select_Columns=['Confirmed','Deaths', 'Recovered', 'Active','Case_Fatality_Ratio']
        DF_UK_2=DF_UK_u[Select_Columns]
        DF_UK_3 = pd.DataFrame(min_max_scaler.fit_transform(DF_UK_2[Select_Columns]),columns=Select_Columns)
        DF_UK_3['Day']=DF_UK_u.index
        DF_UK_3.set_index('Day', inplace=True)
        DF_UK_3.to_csv('/opt/airflow/data/DF_UK_Scaled.csv')

def Plotting():
        import pandas as pd 
        import matplotlib.pyplot as plt
        DF_UK_u_3 = pd.read_csv('/opt/airflow/data/DF_UK_Scaled.csv', parse_dates=['Day'])
        DF_UK_u_3.set_index('Day', inplace=True) 
        Select_Columns=['Confirmed','Deaths', 'Recovered', 'Active','Case_Fatality_Ratio']
        DF_UK_u_3[Select_Columns].plot(figsize=(30,20))
        plt.savefig('/opt/airflow/output/UK_scoring_report.png')
    

def CSV_to_Postgres():
        import sqlalchemy
        from sqlalchemy import create_engine
        import pandas as pd

        DF_UK_Scaled = pd.read_csv('/opt/airflow/data/UK_scoring_report.csv', parse_dates=['Day'])
        DF_UK_Scaled.set_index('Day', inplace=True)

        host="postgresDev"
        database="testDB"
        user="me"
        password="1234"
        port='5432'
        engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{database}')
        DF_UK_Scaled.to_sql('UK_scoring_report', engine,if_exists='replace',index=False)    

 
 
default_args = {
    'owner': 'Huda',
    'start_date': dt.datetime(2022, 5, 30),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=1),
}
 
with DAG('UK_COVID19_dag',
         default_args=default_args,
         schedule_interval=timedelta(days=1),  
         catchup=False,     
         ) as dag:
    
    Install_dependecies = BashOperator(task_id='Install_dependecies',bash_command='pip install sklearn matplotlib')   
    Extract_Data = PythonOperator(task_id='Extract_UK_Data', python_callable=Create_data)
    Scaled_Data = PythonOperator(task_id='Scale_UK_Data', python_callable=MinMaxScaler)
    Ploting_Data = PythonOperator(task_id='Plot_UK_Data', python_callable=Plotting)
    Push_Data_to_Postgres = PythonOperator(task_id='Push_Data_to_Postgres', python_callable=CSV_to_Postgres)

 
 
 

Install_dependecies >> Extract_Data >> Scaled_Data >> Ploting_Data >> Push_Data_to_Postgres