# ETL de arquivos CSV via IMAP

Este notebook é exemplo de como consumir CSVs recebidos como anexos em e-mails.

O arquivo de configuração `sources.conf` deve conter:

* usuário e senha para o servidor IMAP
* pasta de e-mail a ser analisada
* subject para as mensagens a serem buscadas
* URL SQLAlchemy para o DB que vai receber os dados analisados. A tabela terá o nome do subject.

Escrito por Avi Alkalay <dataNerd@digitalhouse.com>

Outubro de 2019

In [None]:
from configobj import ConfigObj    # dnf install python3-configobj
import email
from email.message import EmailMessage
import pandas as pd
import io
import imaplib
import hashlib
import sqlalchemy

A função `clean()` recebe o DataFrame como veio do CSV e arruma as colunas e os dados. Retorna um DataFrame limpo e organizado.

In [None]:
def clean(df):

    def makeID(a):
        idCalc=hashlib.new('shake_256')
        idCalc.update(a.encode('UTF-8'))
        return idCalc.hexdigest(5)

    # Split sucursal e apólice
    # As vezes vem o dado vem como "123-123456,234-23456", daí precisa do `split(',')[0]` antes
    temp=df['Data - Sucursal - Apólice'].str.split(',')
    temp=temp[0][0].split('-')
    df['sucursal']=int(temp[0])
    df['apolice']=int(temp[1])

    # Corrige a data e hora
    df['sec']=30
    df['date']=pd.to_datetime(df['Hour of Day'].astype(str)+df['Minute'].astype(str)+df['sec'].astype(str), format='%Y%m%d%H%M%S', errors='ignore', utc=None)

    # Calcula um ID para a linha
    df['id']=(df['Hour of Day'].astype(str)+df['Minute'].astype(str)+df['apolice'].astype(str)).apply(makeID)

    df.rename(columns={'Event Label': 'event', 'Unique Events': 'count'}, inplace=True)
    
    df.set_index('id', inplace=True)

    return df[['date', 'sucursal', 'apolice', 'event', 'count']]

Lê o arquivo de configuração e guarda seus valores em `context`

In [None]:
# Read config file
context=ConfigObj('sources.conf')

Conecta num servidor IMAP (`context['imap_server']`), entra na pasta (`context['imap_folder']`) e busca por todas as mensagens não lidas (`UNSEEN`).

In [None]:
dict(context)

In [None]:
M = imaplib.IMAP4_SSL(context['imap_server'])
M.login(context['imap_user'], context['imap_password'])
M.select(context['imap_folder'])
typ, data = M.search(None, 'UNSEEN')
#print(typ)

Itera por todas as mensagens da pasta, desanexa o arquivo CSV e já passa ele para função `clean()` para a limpeza necessária. Depois da limpeza de cada CSV, concatena ele aos outros na variável `df`.

In [None]:
df=None

for num in data[0].split():
    typ, data = M.fetch(num, '(RFC822)')
        
    for response_part in data:
        if isinstance(response_part, tuple):
            original = email.message_from_bytes(response_part[1])

#             print(original['From'])
#             print(original['Subject'])
#             print(original['Date'])
            #typ, data = mail.store(num,'+FLAGS','\\Seen')

            
            for part in original.walk():
                d=None
                
                if part.get_content_maintype() == 'multipart':
                    continue
                if part.get('Content-Disposition') is None:
                    continue

                filename = part.get_filename()
                
                try:
                    d=clean(pd.read_csv(io.BytesIO(part.get_payload(decode=True)), encoding='UTF-8', comment='#'))
                except pd.errors.EmptyDataError as e:
                    pass

#                 if d is not None:
#                     print(d.shape)
                    
                    
                if df is None:
                    df=d
                else:
                    df=df.append(d)

                
if df is not None:
    df.sort_values(by='date',inplace=True)
    print(df.shape)

M.close()
M.logout()

O DataFrame resultante contém todos os dados de todos os CSVs anexos de todas as mensagens de e-mail na pasta. Agora escreve no banco de dados (`context['database']`) na tabela de mesmo nome do subject das mensagens procuradas (`context['imap_subject']`). Substitui a tabela se já existir no DB.

In [None]:
df

In [None]:
if df is not None:
    try:
        db=sqlalchemy.create_engine(context['database'], encoding='utf8')
    except sqlalchemy.exc.SQLAlchemyError as error:
        self.logger.error('Can’t connect to DB.', exc_info=True)
        raise error

    df.reset_index().to_sql(context['imap_subject'], index=False, if_exists='replace', con=db)

In [None]:
df.tail()