In [2]:
%%writefile /home/dags/assignment_2.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import subprocess
from sqlalchemy import create_engine
import pandas as pd


host = "postgres_storage"
database = "csv_db"
user = "aawadallah"
password = "1234"
port = '5432'


engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{database}')

def Get_DF_i(Day):
    DF_i=None
    
    try: 
        URL_Day=f'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/{Day}'
        DF_day=pd.read_csv(URL_Day)
        DF_day['Day']=Day.split('.')[0]
        cond=(DF_day.Country_Region=='Germany')&(DF_day.Province_State=='Berlin')
        Selec_columns=['Day','Country_Region', 'Last_Update',
          'Lat', 'Long_', 'Confirmed', 'Deaths', 'Recovered', 'Active',
          'Combined_Key', 'Incident_Rate', 'Case_Fatality_Ratio']
        DF_i=DF_day[cond][Selec_columns].reset_index(drop=True)
    except:
        pass
    
    return DF_i



def _fetch_data_as_DF(**context):
    # this to grep all the files names  from the repo 
    CMD = "curl -s https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports | grep -Eo '[0-9-]*.csv' | sort -Vu"
    output = subprocess.check_output(CMD, shell=True)
    List_of_days = output.decode('utf-8').split('\n')
    List_of_days = [line for line in List_of_days if line.strip() != ""]
    #Appending all data. 
    # lst_all_DFs= multiprocessing.Pool().map(Get_DF_i, List_of_days)  I've tried to multiprocce the data but seems like it's not allowed in airflow,
    #AssertionError: daemonic processes are not allowed to have children
    
    lst_all_DFs=[]
    for Day in List_of_days:
        lst_all_DFs.append(Get_DF_i(Day))
    
    #ConvertList to DF 
    DF_all = pd.concat(lst_all_DFs).reset_index(drop=True)
    DF_all.to_csv('/home/sharedVol/data.csv')



def _minMax_scale_data(**context):
    DF_Germany=pd.read_csv('/home/sharedVol/data.csv')
    Selec_Columns=['Confirmed','Deaths', 'Recovered', 'Active', 'Incident_Rate','Case_Fatality_Ratio']
    DF_Germany_2 = DF_Germany[Selec_Columns]


    from sklearn.preprocessing import MinMaxScaler

    min_max_scaler = MinMaxScaler()
    DF_Germany_3 = pd.DataFrame(min_max_scaler.fit_transform(DF_Germany_2),columns=Selec_Columns)
    DF_Germany_3.index=DF_Germany_2.index
    DF_Germany_3['Day']=DF_Germany.Day
    DF_Germany_3.to_csv('/home/sharedVol/Scaleddata.csv')

    



def _push_data_to_postgress_and_Plot(**context):
    DF_Germany=pd.read_csv('/home/sharedVol/data.csv')
    DF_Germany_3=pd.read_csv('/home/sharedVol/Scaleddata.csv')
    DF_Germany.to_sql('data_without_scaling', engine,if_exists='replace',index=False)
    DF_Germany_3.to_sql('data_with_scaling', engine,if_exists='replace',index=False)
    
    import matplotlib.pyplot as plt 
    import matplotlib
    font = {'weight' : 'bold',
            'size'   : 18}

    matplotlib.rc('font', **font)
    Selec_Columns=['Confirmed','Deaths', 'Recovered', 'Active', 'Incident_Rate','Case_Fatality_Ratio']
    DF_Germany_3[Selec_Columns].plot(figsize=(20,10))
    plt.savefig('/home/output/germany_scoring_report.png')
    DF_Germany_3.to_csv('/home/output/germany_scoring_report.csv')
    


def _install_tools():

    try:
        import psycopg2
    except:
        subprocess.check_call(['pip', 'install', 'psycopg2-binary'])
        import psycopg2

    try:
        from sqlalchemy import create_engine
    except:
        subprocess.check_call(['pip', 'install', 'sqlalchemy'])
        from sqlalchemy import create_engine
        
    try:
        import pandas as pd
    except:
        subprocess.check_call(['pip', 'install', 'pandas'])
        import pandas as pd
        
    try:
        import matplotlib 
    except:
        subprocess.check_call(['pip', 'install', 'matplotlib'])
        import matplotlib
        
    try:
        import sklearn 
    except:
        subprocess.check_call(['pip', 'install', 'sklearn'])
        import sklearn        



with DAG("ETL_JHC", start_date=datetime(2021, 1, 1),
         schedule_interval="0 1 * * *", catchup=False) as dag: #to run it everyday at 1 PM
    install_tools = PythonOperator(
        task_id="install_tools",
        python_callable=_install_tools,
        provide_context=True
    )
    
    fetchData = PythonOperator(
        task_id="fetch_data_and_save_it_to_filesystem",
        python_callable=_fetch_data_as_DF,
        provide_context=True
    )

    minMaxScaleData = PythonOperator(
        task_id="minMax_Scale_data",
        python_callable=_minMax_scale_data,
        provide_context=True
    )

    pushDataToPG = PythonOperator(
        task_id="push_data_to_postgress_and_polt_report",
        python_callable=_push_data_to_postgress_and_Plot,
        provide_context=True
    )

    install_tools >> fetchData >> minMaxScaleData >> pushDataToPG

Overwriting /home/dags/assignment_2.py


In [6]:
try:
    from faker import Faker
except:
   !pip install faker 
   from faker import Faker
    
try:
    import psycopg2 
except:
    !pip install psycopg2-binary 
    import psycopg2
    
try:
    from sqlalchemy import create_engine
except:
    !pip install sqlalchemy
    from sqlalchemy import create_engine
    
    
try:
    import pandas as pd 
except:
    !pip install pandas
    import pandas as pd 
     
try:
    import matplotlib 
except:
    !pip install matplotlib
    import matplotlib

try:
    import sklearn 
except:
    !pip install sklearn
    import sklearn

Collecting faker
  Downloading Faker-8.4.0-py3-none-any.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 497 kB/s eta 0:00:01
Collecting text-unidecode==1.3
  Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
[K     |████████████████████████████████| 78 kB 1.1 MB/s eta 0:00:01
Installing collected packages: text-unidecode, faker
Successfully installed faker-8.4.0 text-unidecode-1.3
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.8.6-cp38-cp38-manylinux1_x86_64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 760 kB/s eta 0:00:01
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.8.6
Collecting matplotlib
  Downloading matplotlib-3.4.2-cp38-cp38-manylinux1_x86_64.whl (10.3 MB)
[K     |████████████████████████████████| 10.3 MB 477 kB/s eta 0:00:01
[?25hCollecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.3.1-cp38-cp38-manylinux1_x86_64.whl (1.2 MB)
[K     |████████████████████████████████| 

In [7]:
from sqlalchemy import create_engine
import pandas as pd


host = "postgres_storage"
database = "csv_db"
user = "aawadallah"
password = "1234"
port = '5432'


engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{database}')


scores_extracted=pd.read_sql("SELECT * FROM data_with_scaling" , engine);
scores_extracted.head(10)


Unnamed: 0.1,Unnamed: 0,Confirmed,Deaths,Recovered,Active,Incident_Rate,Case_Fatality_Ratio,Day
0,0,0.414843,0.306822,0.365525,0.7883,0.264754,0.389571,01-01-2021
1,1,0.41818,0.309337,0.367915,0.793958,0.266884,0.39096,01-02-2021
2,2,0.422938,0.315624,0.375865,0.765119,0.269921,0.398892,01-03-2021
3,3,0.428531,0.32254,0.383254,0.746993,0.27349,0.407042,01-04-2021
4,4,0.43749,0.342345,0.392163,0.739907,0.279207,0.438756,01-05-2021
5,5,0.446963,0.359635,0.400894,0.738588,0.285253,0.463639,01-06-2021
6,6,0.458345,0.372839,0.408365,0.762538,0.292517,0.476751,01-07-2021
7,7,0.469073,0.391701,0.414076,0.794672,0.299364,0.502086,01-08-2021
8,8,0.474383,0.395787,0.418226,0.800879,0.302753,0.503531,01-09-2021
9,9,0.476849,0.397359,0.427539,0.744576,0.304327,0.503524,01-10-2021
