In [1]:
from extractors import gov_extract, local_extract, web_scrapping, twitter_extract
import extractors.config as config
import pandas as pd
import numpy as np
from functools import reduce

# Extract

The extraction process is given to components in extractors folder that use the config files in configs folder to extract data from apis, local files and web scrapping. This is useful because I can modularize the code and make them more scalable.

The complete process consist in use gov_extract, local_extract and web_scrapping to get date from the different data sources specified in the [README](../README.md) and finally loading the raw data in memory for later use in transfrom phase.

In [5]:
daily_cases, daily_test = gov_extract.load()
employe, inflation = local_extract.load()

In [2]:
news = web_scrapping.load()



404 Client Error: Not Found for url: https://www.eltiempo.com/buscar/88?q=covid&category=salud&publishedAt%5Bfrom%5D=20-02-01&publishedAt%5Buntil%5D=20-12-01&contentTypes%5B0%5D=article


In [3]:
news = reduce(lambda x, y: x + y, news)

In [4]:
tweets = twitter_extract.load()

INFO:searchtweets.result_stream:using bearer token for authentication
INFO:searchtweets.result_stream:paging; total requests read so far: 1
INFO:searchtweets.result_stream:paging; total requests read so far: 2
INFO:searchtweets.result_stream:paging; total requests read so far: 3
INFO:searchtweets.result_stream:paging; total requests read so far: 4
INFO:searchtweets.result_stream:paging; total requests read so far: 5
INFO:searchtweets.result_stream:paging; total requests read so far: 6
INFO:searchtweets.result_stream:ending stream at 640 tweets
INFO:searchtweets.result_stream:using bearer token for authentication
INFO:searchtweets.result_stream:paging; total requests read so far: 1
INFO:searchtweets.result_stream:paging; total requests read so far: 2
INFO:searchtweets.result_stream:paging; total requests read so far: 3
INFO:searchtweets.result_stream:paging; total requests read so far: 4
INFO:searchtweets.result_stream:ending stream at 473 tweets
INFO:searchtweets.result_stream:using be

# Transform

In [73]:
import itertools

year_list = [x for x in range(2001, 2021)]
month_list = [x for x in range(1, 13)]
column_names = ['Concepto']
column_names = column_names + ['{}/{}/{}'.format(1, str(date[1]),str(date[0])) for date in itertools.product(year_list, month_list)]
column_names = column_names[:-2]

In [74]:
employe_processed = employe.dropna().copy()
column_dict = dict(zip(list(employe_processed.columns), column_names))

employe_processed.rename(columns = column_dict, inplace = True)
employe_processed = employe_processed.transpose()
employe_processed.drop(['Concepto'], inplace = True)
employe_processed.rename(
    columns = {
        12: "TGP",
        13: "TO",
        14: "TD",
        16: "Ocupados",
        17: "Desocupados",
        18: "Inactivos"
    }, inplace = True)

# Get the data from the last 8 months, to fit with daily positive cases and test.
employe_processed = employe_processed[-8:]
employe_processed.reset_index(inplace = True)
employe_processed.rename(columns = {'index': 'fecha'}, inplace = True)

employe_processed

Unnamed: 0,fecha,TGP,TO,TD,Ocupados,Desocupados,Inactivos
0,1/3/2020,60.1793,52.7533,12.3397,20859.9,2936.39,15746.1
1,1/4/2020,51.4751,41.2136,19.9349,16374.0,4076.86,19278.8
2,1/5/2020,55.3874,43.6973,21.1061,17375.4,4648.35,17739.3
3,1/6/2020,57.2966,45.4439,20.6866,18214.6,4750.77,17116.2
4,1/7/2020,57.2245,45.9524,19.698,18191.5,4462.35,16933.9
5,1/8/2020,59.2239,49.1646,16.9851,19608.7,4012.0,16263.0
6,1/9/2020,60.2381,50.5068,16.1547,20172.7,3886.71,15881.1
7,1/10/2020,60.9711,51.4164,15.6708,20626.3,3832.98,15656.9


In [75]:
inflation_processed = inflation[7:341].drop(['Unnamed: 5'], axis = 1).copy()
inflation_processed.rename(
    columns = {
        "Meta de inflación e inflación total al consumidor": "fecha",
        "Unnamed: 1": "Inflación total",
        "Unnamed: 2": "Límite superior",
        "Unnamed: 3": "Meta de inflación",
        "Unnamed: 4": "Límite inferior"
    }, inplace = True)
# Get the data from the last 8 months, to fit with daily positive cases and test.
inflation_processed = inflation_processed[:8].reset_index().drop(['index'], axis = 1)
inflation_processed['fecha'] = inflation_processed['fecha'].apply(lambda x: '{}/{}/{}'.format(1, int(str(x)[4:6]), str(x)[0:4]))
inflation_processed = inflation_processed.iloc[::-1]
inflation_processed

Unnamed: 0,fecha,Inflación total,Límite superior,Meta de inflación,Límite inferior
7,1/3/2020,3.86,4,3,2
6,1/4/2020,3.51,4,3,2
5,1/5/2020,2.85,4,3,2
4,1/6/2020,2.19,4,3,2
3,1/7/2020,1.97,4,3,2
2,1/8/2020,1.88,4,3,2
1,1/9/2020,1.97,4,3,2
0,1/10/2020,1.75,4,3,2


In [76]:
daily_test_processed = daily_test.drop([0], axis = 0).copy()
daily_test_processed['fecha'] = daily_test_processed['fecha'].apply(lambda x: '{}/{}/{}'.format(int(x.split("-")[2][:2]), int(x.split("-")[1]), x.split("-")[0]))
daily_test_processed.drop(['acumuladas', 'positivas_acumuladas', 'negativas_acumuladas', 'positividad_acumulada', 'indeterminadas'], axis = 1, inplace = True)
daily_test_processed.fillna(value = 0, inplace = True)

cities = list(list(daily_test_processed.columns)[1:])
daily_test_transposed = pd.DataFrame(columns = ['fecha', 'cantidad', 'procedencia'])

transposed_data = []
for index, row in daily_test_processed.iterrows():
    for _, city in enumerate(cities):
        transposed_data.append([row['fecha'], row[city], str.upper(city)])

transposed_data = np.array(transposed_data)
daily_test_transposed['fecha'] = transposed_data[:,0]
daily_test_transposed['cantidad'] = transposed_data[:,1]
daily_test_transposed['procedencia'] = transposed_data[:,2]

daily_test_transposed = daily_test_transposed.astype({'cantidad': 'float64'})
daily_test_transposed

Unnamed: 0,fecha,cantidad,procedencia
0,5/3/2020,0.0,AMAZONAS
1,5/3/2020,0.0,ANTIOQUIA
2,5/3/2020,0.0,ARAUCA
3,5/3/2020,0.0,ATLANTICO
4,5/3/2020,0.0,BOGOTA
...,...,...,...
10369,2/12/2020,12007.0,PROCEDENCIA_DESCONOCIDA
10370,2/12/2020,0.0,BARRANQUILA
10371,2/12/2020,179180.0,CARTAGENA
10372,2/12/2020,77546.0,SANTA_MARTA


In [77]:
daily_cases_processed = daily_cases.copy()
daily_cases_processed.reset_index(inplace = True)
daily_cases_processed.drop(['index', 'fecha_de_notificaci_n', 'id_de_caso', 'departamento', 'pais_viajo_1_cod',
                            'ciudad_municipio', 'unidad_medida', 'fecha_reporte_web', 'per_etn_'], 
                           axis =  1, inplace = True)

column_date_list = ['fecha_diagnostico', 'fecha_inicio_sintomas', 'fecha_recuperado', 'fecha_muerte']

for column in column_date_list:
    daily_cases_processed[column] = daily_cases_processed[column].apply(lambda x: '{}'.format(x.split(' ')[0]) if type(x) == str else x)
    
daily_cases_processed

Unnamed: 0,departamento_nom,ciudad_municipio_nom,edad,sexo,fuente_tipo_contagio,ubicacion,estado,pais_viajo_1_nom,recuperado,fecha_inicio_sintomas,fecha_muerte,fecha_diagnostico,fecha_recuperado,tipo_recuperacion,nom_grupo_
0,BOGOTA,BOGOTA,19,F,Importado,Casa,Leve,ITALIA,Recuperado,27/2/2020,,6/3/2020,13/3/2020,PCR,
1,VALLE,BUGA,34,M,Importado,Casa,Leve,ESPAÑA,Recuperado,4/3/2020,,9/3/2020,19/3/2020,PCR,
2,ANTIOQUIA,MEDELLIN,50,F,Importado,Casa,Leve,ESPAÑA,Recuperado,29/2/2020,,9/3/2020,15/3/2020,PCR,
3,ANTIOQUIA,MEDELLIN,55,M,Relacionado,Casa,Leve,,Recuperado,6/3/2020,,11/3/2020,26/3/2020,PCR,
4,ANTIOQUIA,MEDELLIN,25,M,Relacionado,Casa,Leve,,Recuperado,8/3/2020,,11/3/2020,23/3/2020,PCR,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1333997,BOGOTA,BOGOTA,62,M,En estudio,Hospital,Moderado,,Activo,3/10/2020,,6/10/2020,,,
1333998,BOGOTA,BOGOTA,71,M,En estudio,Hospital,Moderado,,Activo,6/10/2020,,6/10/2020,,,
1333999,BOGOTA,BOGOTA,61,F,En estudio,Hospital,Moderado,,Activo,1/10/2020,,6/10/2020,,,
1334000,BOGOTA,BOGOTA,31,F,En estudio,Casa,Leve,,Activo,6/10/2020,,6/10/2020,,,


In [78]:
news_array = [[x['title'].replace('\n', ''), x['publised_time'].strftime('%-d/%-m/%Y'), x['category'], x['resume'].replace('\n', '')] for x in news]
news_df = pd.DataFrame(data = news_array, columns = ['titulo', 'fecha', 'categoria', 'resumen'])
news_df.drop_duplicates(subset = ["titulo", "resumen"], inplace = True)
news_df = news_df.iloc[::-1]
news_df.reset_index(inplace = True)
news_df.drop(['index'], inplace = True, axis = 1)
news_df

Unnamed: 0,titulo,fecha,categoria,resumen
0,Coronavirus tiene nombre oficial: OMS lo bauti...,11/2/2020,Salud,Nace del acrónimo en inglés a partir de la exp...
1,"El Covid-19, una amenaza mundial que deja más ...",11/2/2020,Salud,Según advirtió la Organización Mundial de la S...
2,Así sería la cuarentena en el país para los co...,18/2/2020,Salud,Los 14 connacionales que serán evacuados de Ch...
3,No se ha confirmado ningún caso de coronavirus...,19/2/2020,Salud,Directivas del centro hospitalario indicaron q...
4,Así será la evacuación y llegada de los colomb...,21/2/2020,Salud,Ministerio de Salud sostiene que el viaje part...
...,...,...,...,...
862,Minsalud dice que por ahora no pedirá pruebas ...,1/12/2020,Salud,Cartera envió preguntas al juez que con tutela...
863,Europa decide sobre primera vacuna covid-19 an...,1/12/2020,Salud,Agencia Europea de Medicamentos (EMA) puso pla...
864,Colombia registra 182 muertes más y 8.430 nuev...,1/12/2020,Salud,El Ministerio de Salud reportó también 6.037 r...
865,Vacuna a toda costa: reflexiones tras días de ...,1/12/2020,Salud,"Análisis de Alejandro Gaviria, Tatiana Andia, ..."


In [79]:
new_tweets = [[x['id'], x['text'], x['fecha']] if 'id' in x else ['nan', 'nan', 'nan'] for x in tweets]
tweets_df = pd.DataFrame(new_tweets, columns = ['id', 'text', 'fecha'])
tweets_df.drop_duplicates(subset = ['text'], inplace = True)
tweets_df = tweets_df.iloc[::-1]
tweets_df.reset_index(inplace = True)
tweets_df.drop(['index'], inplace = True, axis = 1)
tweets_df

Unnamed: 0,id,text,fecha
0,1333925517876146185,@MinSaludCol @INSColombia @ViceColombia @Fruiz...,2020-12-02
1,1333929634744987650,Covid-19: 7.986 Casos Nuevos y 168 fallecidos ...,2020-12-02
2,1333930851743260672,Colombia Profunda N° 7.\nLa vacuna del COVID19...,2020-12-02
3,1333931466808504321,"@SenorCaicedo @IvanDuque No joñe, parece que e...",2020-12-02
4,1333931510592925698,Solo queda conciliación entre cámara y Senado ...,2020-12-02
...,...,...,...
844,1333196006981332993,Resultados #COVID19 en #Colombia: 29 Noviembre...,2020-11-29
845,1333196276016549888,#29Nov Vicepdta. @drodriven2: En las últimas 2...,2020-11-29
846,1333197852428931072,"RT @RNVinformativa: #EnVivo | Presidente, @Nic...",2020-11-29
847,1333198392609091585,RT @enahpiutoficial: #29Nov Vicepdta. @drodriv...,2020-11-29


# Cross Data

In [120]:
import datetime
import math

def get_date_id(df, column, target):
    return [ target[x] if x in target else -1 for x in df[column].tolist() ]

base_date = datetime.datetime.today()
dates = [ base_date - datetime.timedelta(days = x) for x in range(310) ]
dates.reverse()

data = [[date.strftime('%-d/%-m/%Y'), int(date.strftime('%-d')), int(date.strftime('%-m')), int(date.strftime('%Y')), math.ceil(float(int(date.strftime('%-m')) / 3))] 
        for date in dates ]

date_table = pd.DataFrame(data, columns = ['cal_date', 'day', 'month', 'year', 'qtr'])
date_dict = {k: v for v, k in enumerate(date_table['cal_date'].tolist())}
date_table

Unnamed: 0,cal_date,day,month,year,qtr
0,29/1/2020,29,1,2020,1
1,30/1/2020,30,1,2020,1
2,31/1/2020,31,1,2020,1
3,1/2/2020,1,2,2020,1
4,2/2/2020,2,2,2020,1
...,...,...,...,...,...
305,29/11/2020,29,11,2020,4
306,30/11/2020,30,11,2020,4
307,1/12/2020,1,12,2020,4
308,2/12/2020,2,12,2020,4


In [121]:
employe_cross = employe_processed.copy()
employe_cross['date_id'] = get_date_id(employe_cross, 'fecha', date_dict)
employe_cross.drop(['fecha'], inplace = True, axis = 1)
employe_cross

Unnamed: 0,TGP,TO,TD,Ocupados,Desocupados,Inactivos,date_id
0,60.1793,52.7533,12.3397,20859.9,2936.39,15746.1,32
1,51.4751,41.2136,19.9349,16374.0,4076.86,19278.8,63
2,55.3874,43.6973,21.1061,17375.4,4648.35,17739.3,93
3,57.2966,45.4439,20.6866,18214.6,4750.77,17116.2,124
4,57.2245,45.9524,19.698,18191.5,4462.35,16933.9,154
5,59.2239,49.1646,16.9851,19608.7,4012.0,16263.0,185
6,60.2381,50.5068,16.1547,20172.7,3886.71,15881.1,216
7,60.9711,51.4164,15.6708,20626.3,3832.98,15656.9,246


In [122]:
inflation_cross = inflation_processed.copy()
inflation_cross['date_id'] = get_date_id(inflation_cross, 'fecha', date_dict)
inflation_cross.drop(['fecha'], inplace = True, axis = 1)
inflation_cross

Unnamed: 0,Inflación total,Límite superior,Meta de inflación,Límite inferior,date_id
7,3.86,4,3,2,32
6,3.51,4,3,2,63
5,2.85,4,3,2,93
4,2.19,4,3,2,124
3,1.97,4,3,2,154
2,1.88,4,3,2,185
1,1.97,4,3,2,216
0,1.75,4,3,2,246


In [123]:
daily_test_cross = daily_test_transposed.copy()
daily_test_cross['date_id'] = get_date_id(daily_test_cross, 'fecha', date_dict)
daily_test_cross.drop(['fecha'], inplace = True, axis = 1)
daily_test_cross

Unnamed: 0,cantidad,procedencia,date_id
0,0.0,AMAZONAS,36
1,0.0,ANTIOQUIA,36
2,0.0,ARAUCA,36
3,0.0,ATLANTICO,36
4,0.0,BOGOTA,36
...,...,...,...
10369,12007.0,PROCEDENCIA_DESCONOCIDA,308
10370,0.0,BARRANQUILA,308
10371,179180.0,CARTAGENA,308
10372,77546.0,SANTA_MARTA,308


In [124]:
news_cross = news_df.copy()
news_cross['date_id'] = get_date_id(news_cross, 'fecha', date_dict)
news_cross.drop(['fecha'], inplace = True, axis = 1)
news_cross

Unnamed: 0,titulo,categoria,resumen,date_id
0,Coronavirus tiene nombre oficial: OMS lo bauti...,Salud,Nace del acrónimo en inglés a partir de la exp...,13
1,"El Covid-19, una amenaza mundial que deja más ...",Salud,Según advirtió la Organización Mundial de la S...,13
2,Así sería la cuarentena en el país para los co...,Salud,Los 14 connacionales que serán evacuados de Ch...,20
3,No se ha confirmado ningún caso de coronavirus...,Salud,Directivas del centro hospitalario indicaron q...,21
4,Así será la evacuación y llegada de los colomb...,Salud,Ministerio de Salud sostiene que el viaje part...,23
...,...,...,...,...
862,Minsalud dice que por ahora no pedirá pruebas ...,Salud,Cartera envió preguntas al juez que con tutela...,307
863,Europa decide sobre primera vacuna covid-19 an...,Salud,Agencia Europea de Medicamentos (EMA) puso pla...,307
864,Colombia registra 182 muertes más y 8.430 nuev...,Salud,El Ministerio de Salud reportó también 6.037 r...,307
865,Vacuna a toda costa: reflexiones tras días de ...,Salud,"Análisis de Alejandro Gaviria, Tatiana Andia, ...",307


In [125]:
daily_cases_cross = daily_cases_processed.copy()
daily_cases_cross['date_symptoms_id'] = get_date_id(daily_cases_cross, 'fecha_inicio_sintomas', date_dict)
daily_cases_cross['date_dead_id'] = get_date_id(daily_cases_cross, 'fecha_muerte', date_dict)
daily_cases_cross['date_diagnosis_id'] = get_date_id(daily_cases_cross, 'fecha_diagnostico', date_dict)
daily_cases_cross['date_recovered_id'] = get_date_id(daily_cases_cross, 'fecha_recuperado', date_dict)
daily_cases_cross.drop(['fecha_inicio_sintomas'], inplace = True, axis = 1)
daily_cases_cross.drop(['fecha_muerte'], inplace = True, axis = 1)
daily_cases_cross.drop(['fecha_diagnostico'], inplace = True, axis = 1)
daily_cases_cross.drop(['fecha_recuperado'], inplace = True, axis = 1)
daily_cases_cross

Unnamed: 0,departamento_nom,ciudad_municipio_nom,edad,sexo,fuente_tipo_contagio,ubicacion,estado,pais_viajo_1_nom,recuperado,tipo_recuperacion,nom_grupo_,date_symptoms_id,date_dead_id,date_diagnosis_id,date_recovered_id
0,BOGOTA,BOGOTA,19,F,Importado,Casa,Leve,ITALIA,Recuperado,PCR,,29,-1,37,44
1,VALLE,BUGA,34,M,Importado,Casa,Leve,ESPAÑA,Recuperado,PCR,,35,-1,40,50
2,ANTIOQUIA,MEDELLIN,50,F,Importado,Casa,Leve,ESPAÑA,Recuperado,PCR,,31,-1,40,46
3,ANTIOQUIA,MEDELLIN,55,M,Relacionado,Casa,Leve,,Recuperado,PCR,,37,-1,42,57
4,ANTIOQUIA,MEDELLIN,25,M,Relacionado,Casa,Leve,,Recuperado,PCR,,39,-1,42,54
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1333997,BOGOTA,BOGOTA,62,M,En estudio,Hospital,Moderado,,Activo,,,248,-1,251,-1
1333998,BOGOTA,BOGOTA,71,M,En estudio,Hospital,Moderado,,Activo,,,251,-1,251,-1
1333999,BOGOTA,BOGOTA,61,F,En estudio,Hospital,Moderado,,Activo,,,246,-1,251,-1
1334000,BOGOTA,BOGOTA,31,F,En estudio,Casa,Leve,,Activo,,,251,-1,251,-1


In [126]:
tweets_cross = tweets_df.copy()
tweets_cross['fecha'] = tweets_cross['fecha'].apply(lambda x: '{}/{}/{}'.format(int(x.split('-')[2]), int(x.split('-')[1]), int(x.split('-')[0])) if x != 'nan' else x)
tweets_cross['date_id'] = get_date_id(tweets_cross, 'fecha', date_dict)
tweets_cross.drop(['fecha'], inplace = True, axis = 1)
tweets_cross.drop(['id'], inplace = True, axis = 1)
tweets_cross

Unnamed: 0,text,date_id
0,@MinSaludCol @INSColombia @ViceColombia @Fruiz...,308
1,Covid-19: 7.986 Casos Nuevos y 168 fallecidos ...,308
2,Colombia Profunda N° 7.\nLa vacuna del COVID19...,308
3,"@SenorCaicedo @IvanDuque No joñe, parece que e...",308
4,Solo queda conciliación entre cámara y Senado ...,308
...,...,...
844,Resultados #COVID19 en #Colombia: 29 Noviembre...,305
845,#29Nov Vicepdta. @drodriven2: En las últimas 2...,305
846,"RT @RNVinformativa: #EnVivo | Presidente, @Nic...",305
847,RT @enahpiutoficial: #29Nov Vicepdta. @drodriv...,305


In [127]:
map_cities = {
     'BARRANQUILA': 'ATLANTICO',
     'CARTAGENA': 'ATLANTICO',
     'SANTA_MARTA': 'ATLANTICO',
     'BARRANQUILLA': 'ATLANTICO'
}

# Clean daily test data of 'procedencia', because certain data came with cities instead of departaments
daily_test_cross['procedencia'] = daily_test_cross['procedencia'].apply(lambda x: map_cities[x] if x in map_cities else x)
daily_cases_cross['departamento_nom'] = daily_cases_cross['departamento_nom'].apply(lambda x: map_cities[x] if x in map_cities else x)

In [128]:
depts = daily_cases_cross['departamento_nom'].drop_duplicates().tolist()
depts.append('PROCEDENCIA_DESCONOCIDA')
daily_cases_location = pd.DataFrame(depts, columns = ['procedence'])

In [129]:
dept_map = {k: v for v, k in enumerate(daily_cases_location['procedence'].tolist())}

daily_test_cross['origin_id'] =  get_date_id(daily_test_cross, 'procedencia', dept_map)
daily_cases_cross['origin_id'] = get_date_id(daily_cases_cross, 'departamento_nom', dept_map)

daily_test_cross.drop(['procedencia'], inplace = True, axis = 1)
daily_cases_cross.drop(['departamento_nom', 'ciudad_municipio_nom'], inplace = True, axis = 1) 

In [130]:
daily_test_cross.to_csv('./output/test.csv', index = True, sep = '|', index_label='test_id')
daily_cases_cross.to_csv('./output/cases.csv', index = True, sep = '|', index_label='case_id')
tweets_cross.to_csv('./output/tweets.csv', index = True, sep = '|', index_label='tweet_id')
news_cross.to_csv('./output/news.csv', index = True, sep = '|', index_label='news_id')
inflation_cross.to_csv('./output/economy.csv', index = True, sep = '|', index_label='economy_id')
employe_cross.to_csv('./output/employment.csv', index = True, sep = '|', index_label='employment_id')
daily_cases_location.to_csv('./output/locations.csv', index = True, sep = '|', index_label='origin_id')
date_table.to_csv('./output/date.csv', index = True, sep = '|', index_label='date_id')