In [0]:
from azure.storage.blob import BlobServiceClient
from datetime import datetime
from io import StringIO
import pandas as pd
from pandas import DataFrame
import ast

In [0]:
def criar_dataFrame(dados: DataFrame) -> DataFrame:
    """Criando DataFrame apartir dos dados"""
    df = pd.read_csv(StringIO(dados))
    df['dados'] = df['dados'].apply(ast.literal_eval)
    return pd.json_normalize(df['dados'])

In [0]:
def converter_para_minutos(duracao: str) -> float:
    """Converte a duração do vídeo para minutos"""
    duracao = duracao.split('H')
    horas = int(duracao[0]) if len(duracao) > 1 else 0 
    duracao = duracao[1] if len(duracao) > 1 else duracao[0]

    duracao = duracao.split('M')
    minutos = int(duracao[0]) if len(duracao) > 1 else 0 
    duracao = duracao[1] if len(duracao) > 1 else duracao[0]

    duracao = duracao.split('S')
    segundos = int(duracao[0]) if len(duracao) > 1 else 0

    return round(horas * 60 + minutos + segundos / 60, 2)

In [0]:
def tratamento_dados(df: DataFrame) -> DataFrame:
    """Tratando Dados"""
    #Excluindo Colunas 
    colunas_para_excluir = ['etag', 'snippet.channelId', 'snippet.thumbnails.default.url','snippet.thumbnails.default.width',	'snippet.thumbnails.default.height','snippet.thumbnails.medium.url','snippet.thumbnails.medium.width',	'snippet.thumbnails.medium.height',	'snippet.thumbnails.high.url', 'snippet.thumbnails.high.width', 'snippet.thumbnails.high.height', 'snippet.thumbnails.standard.url', 'snippet.thumbnails.standard.width','snippet.thumbnails.standard.height', 'snippet.thumbnails.maxres.width', 'snippet.thumbnails.maxres.height', 'snippet.categoryId',	'snippet.liveBroadcastContent', 'contentDetails.caption','contentDetails.licensedContent','contentDetails.projection', 'statistics.favoriteCount',  'snippet.defaultLanguage',	'contentDetails.regionRestriction.allowed', 'snippet.localized.title', 'snippet.localized.description', 'contentDetails.dimension']

    df = df.drop(colunas_para_excluir, axis=1)

    #Tratando a coluna 'kind'
    df['kind'] = df['kind'].apply(lambda x: x.replace('#', ' '))

    #Criando coluna dia e Horario da publicação
    df[['dia', 'horario']] = df['snippet.publishedAt'].str.split('T', n = 1, expand = True)
    df = df.drop('snippet.publishedAt', axis = 1)

    #Criando uma coluna chamada link com o link do video
    df['link'] = df['id'].apply(lambda x: 'www.youtube.com/watch?v=' + x)

    #Tratando a duração do video, padronizando todos em minutos
    df['contentDetails.duration'] = df['contentDetails.duration'].apply(lambda x: x.replace('PT', ''))
    df['contentDetails.duration'] = df['contentDetails.duration'].apply(converter_para_minutos)

    #Tratando a coluna horario
    df['horario'] = df['horario'].apply(lambda x: x.replace('Z', ''))

    #Renomeando as colunas 
    mapeamento_colunas = {
    'kind': 'tipo',
    'snippet.title': 'titulo',
    'snippet.description': 'descricao', 
    'snippet.thumbnails.maxres.url':'thumbnails_png', 
    'snippet.channelTitle': 'canal', 
    'snippet.tags': 'tags', 
    'snippet.defaultAudioLanguage':'idioma', 
    'contentDetails.duration':'duracao', 
    'contentDetails.definition': 'definicao', 
    'statistics.viewCount': 'visualizacoes', 
    'statistics.likeCount': 'likes',
    'statistics.commentCount': 'comentarios', 
    }

    df.rename(columns=mapeamento_colunas, inplace=True)

    return df 


In [0]:
if __name__ == '__main__':

    # Configurações
    nome_conta = "etldatarafael"
    key_conta = "QZA1D2QY30SNBOlXTLfG8RY8IeSjfx6T4Jr2NHTJIAo16WmImq0nLQswh0zx2iTrBY4diUArdAXO+ASt4NM94A=="
    nome_container = "bronze"
    nome_arquivo = 'data_' + datetime.now().strftime("%Y%m%d") + '.csv'

    # Conectar ao Blob Storage Bronze
    blob_service_client = BlobServiceClient(account_url=f"https://{nome_conta}.blob.core.windows.net", credential=key_conta)
    container_client = blob_service_client.get_container_client(nome_container)
    cliente_blob = container_client.get_blob_client(nome_arquivo)
    
    #Baixando dados
    dados = cliente_blob.download_blob().readall().decode('utf-8')

    #Criando DataFrame
    df = criar_dataFrame(dados)

    #Tratando DataFrame
    df = tratamento_dados(df)


    nome_container = "silver"
    container_client = blob_service_client.get_container_client(nome_container)

    # Verifica se o arquivo já existe no Blob Storage
    blob_client = container_client.get_blob_client(nome_arquivo)
    if blob_client.exists():
        # Excluindo se o arquivo ja existir 
        blob_client.delete_blob()

    new_content = df.to_csv(index=False)

    # Converter o conteúdo para bytes e fazer o upload para o Blob Storage
    blob_client.upload_blob(StringIO(new_content).read().encode('utf-8'))
