# Coleta de Twitter utilizando a API do Twitter + AWS S3 e GLUE
  * Este projeto visa coletar tweets de um determinado assunto e posteriormente gravar estes dados no S3 para serem processadas utilizando o AWS Glue.
   * Para este projeto foi utilizado como assunto o'que estão dizendo dos candidatos presidenciáveis #Bolsonaro e #Lula


In [175]:
# Importação da biblioteca usadas no projeto

import tweepy
import pandas as pd
import json
import boto3
import boto
import boto.s3.connection
from datetime import datetime
from io import StringIO

# Iniciando o Pipeline TW

In [176]:
#Horario de processamento

data_now = datetime.now()
data = data_now.strftime("%Y/%m/%d")
hora = data_now.strftime("%H:%M:%S")
print('### Processamento iniciado: '+data+' as '+hora+' ###')

### Processamento iniciado: 2022/06/16 as 15:49:40 ###


# Lendo os arquivo TXT conteando as Keys Twitter + S3

In [177]:
#Lendo as chaves de acesso Twitter
with open("keysTwi.txt",'r') as linhas:
    API_key= linhas.readline().strip('\n')
    API_secret_Key= linhas.readline().strip('\n')
    access_token= linhas.readline().strip('\n')
    access_token_secret= linhas.readline().strip('\n')


#Lendo as chaves de acesso S3
with open("keysS3.txt",'r') as linhas:
    aws_access_key_id= linhas.readline().strip('\n')
    API_secret_access_Key= linhas.readline().strip('\n')

# Autenticação no Twitter

In [178]:
#Realizando a autenticação no Twitter
auth = tweepy.OAuth1UserHandler(API_key,API_secret_Key)
auth.set_access_token(access_token,access_token_secret)

try :
    api = tweepy.API(auth)
    #Testando a conexão
    public_tw = api.home_timeline()
    print("### Autenticação no TW realizada com sucesso ###")
    
except: 
    print("### Não foi possivel fazer a autenticação no TW ###")

### Autenticação no TW realizada com sucesso ###


# Autenticação S3

In [179]:
# Criando conexão com o S3
try:
    conn = boto.connect_s3( aws_access_key_id,API_secret_access_Key)
    print("### Autenticação no S3 realizada com sucesso ###")
except:
    print("### Não foi possivel fazer a autenticação no S3 ###")

### Autenticação no S3 realizada com sucesso ###


# Coleta dados do Twitter

In [180]:
#Criando a string(query) de pesquisa filtando somente novos tweets

search_query = "#Lula OR Bolsonaro" + " -filter:retweets"

In [181]:
# Criando listas de armazenamento dos TW
 
tweets = []
info = []

In [182]:
data_now = datetime.now()
data = data_now.strftime("%Y-%m-%d")
hora = data_now.strftime("%H:%M:%S")

try:
    #Criando lista de tweets
    for tweet in tweepy.Cursor(api.search_tweets,
                               q=search_query,
                               tweet_mode='extended',
                               lang='pt').items(100):
        
    #Lendo a lista de tweets obtidos
        if 'retweeted_status' in dir (tweet):
               aux = tweet.retweeted_status.full_text
        else:
                aux = tweet.full_text
        newtweet = aux.replace("\n", "")
        
        #Gravando apenas o text dos tweet
        tweets.append(newtweet)

        #Gravando todos os dados do tweet
        info.append(tweet)

        #Criando dataframe com os twetts
    df_tw = pd.DataFrame(tweets,columns=['twitter'])
    df_tw['twitter'] = df_tw['twitter'].astype(str)

    #Salvando csv com os twitter
    #df_tw.to_csv("twitterAPI-"+data+"-"+hora, "utf-8")

except :
    print("### Não foi possivel obter a lista de TW ###")
    

# Gravando o arquivo com os Twittes no S3

In [183]:
#Lendo os objetos do bucket
#Esta função sera usada para verificar se já existe arquivo salvo com o mesmo nome, se tiver não faz uma nova insrção.

def readbuckets3(aws_access_key_id,aws_secret_access_key):
    #Criando sessão no S3
    session = boto3.Session( 
             aws_access_key_id=aws_access_key_id, 
             aws_secret_access_key=API_secret_access_Key)

    #Usando a sessão criada para ler os objetos no bucket
    s3 = session.resource('s3')

    #Nome do bucket que vai ser verificado os objetos
    my_bucket = s3.Bucket('twitter-datalake-raw')

    #Listando os objetos e passando para a lista = objetosBucket
    objetosBucket = []
    
    for my_bucket_object in my_bucket.objects.all():
        objetosBucket.append(my_bucket_object.key)
        #print("Objetos no bucket :" ,my_bucket_object.key)
        
    return objetosBucket

In [184]:
#Gravando arquivo no S3

#Contando numero de linhas do dataframe
for cont in df_tw.count():
    contadorRows = cont

#Obtendo a data data do processamento
dataFile = datetime.today().strftime('%Y-%m-%d')
hora = datetime.today().strftime("%H:%M:%S")

#Definindo o nome do novo arquivo 
novoArquivo = 'File-'+dataFile+'.csv'

#Instanciando a função
objetosBucket = readbuckets3(aws_access_key_id,API_secret_access_Key)

if contadorRows > 0 :
    
    #Verificando se já existe objeto gravado com o mesma data+nome
    for objetos in objetosBucket:
        if objetos == novoArquivo:
            contador = True
        else:
             contador = False 
                
        #Inserindo o arquivo no S3
    if contador == False:
            #Criando a conexão com o S3
            s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id,
                                    aws_secret_access_key = API_secret_access_Key)
            #Gravando o arquivo no S3
            csv_buf = StringIO()
            df_tw.to_csv(csv_buf, header=True , index=False)
            csv_buf.seek(0)
            s3.put_object(Bucket='twitter-datalake-raw', Body=csv_buf.getvalue(), Key='File-'+dataFile+'.csv')
            print("O arquivo >" + novoArquivo +"< foi gravado com sucesso no S3")         
    else:
            print("O arquivo >" + novoArquivo +"< já foi gravado no S3")
else:
    print("Nenhum registro disponivel para gravação no S3")

O arquivo >File-2022-06-16.csv< já foi gravado no S3


In [185]:
#Listando os arquivos existentes no bucket

for objetos in objetosBucket:
        print(objetos)

File-2022-06-12.csv
File-2022-06-14.csv
File-2022-06-16.csv


In [186]:
# Fim do processamento
#Horario de processamento

data_now = datetime.now()
data = data_now.strftime("%Y/%m/%d")
hora = data_now.strftime("%H:%M:%S")
print('### Processamento finalizado: '+data+' as '+hora+' ###')

### Processamento finalizado: 2022/06/16 as 15:49:55 ###
