In [1]:
#pip install azure-functions azure-functions-durable pandas azure-storage-blob azure-functions python-dotenv langchain-openai langchain pinecone-client pyarrow adlfs fsspec langchain_pinecone azure-storage-file-datalake -q

In [2]:
#pip freeze ->requirements.txt

In [23]:
import azure.functions as func
import logging
from datetime import datetime
import os

from external.blob_storage import BlobStorage
from external.data_lake_pandas import DataLakePandas
from external.data_lake_storage import DataLakeStorage

from app.repositories.pandas_loader import DataLakeLoader
from app.repositories.blob_storage_reader import BlobStorageReader
from app.repositories.pandas_reader import DataLakeReader

from app.use_cases.data_prep_delta_curated import DataPreprocessingDeltaCurated
from app.use_cases.data_prep_delta_raw import DataPreprocessingDeltaRaw 

from dotenv import load_dotenv
load_dotenv()


DELTA_DAYS = os.environ.get('DELTA_DAYS')
DELTA_HOURS = os.environ.get('DELTA_HOURS')
SCHEDULE = os.environ.get('CRON_SCHEDULE')

bp1 = func.Blueprint()
container_name = "landing"
blob_storage = BlobStorage(container_name)
bs_reader = BlobStorageReader(blob_storage)
dl_pandas = DataLakePandas()
dl_loader = DataLakeLoader(dl_pandas)

dl_storage = DataLakeStorage("raw")
dl_reader = DataLakeReader(dl_pandas,dl_storage)

use_case_delta_raw = DataPreprocessingDeltaRaw(bs_reader, dl_loader, dl_reader)  

use_case_delta_curated = DataPreprocessingDeltaCurated(dl_reader, dl_loader)  


timestamp = datetime.now()
filter_date = timestamp.strftime('%Y-%m-%d')
try:
    container_source = "landing"
    container_sink = "raw"
    filename= "data_from_youtube"
    filter = None
    delta_day= int(DELTA_DAYS)
    delta_hour= int(DELTA_HOURS)
    df_raw = use_case_delta_raw.execute(container_source, 
                            container_sink,
                            filename,
                            delta_day=delta_day,
                            delta_hour= delta_hour
                            )
    logging.info('external to raw')
    print('external to raw')
    container_source = "raw"
    container_sink = "curated"
    filename= "data_from_youtube"
    filter = None
    df_curated = use_case_delta_curated.execute(container_source, 
                                                container_sink,
                                                filename,
                                                delta_day,
                                                delta_hour, 
                                                filter)
    logging.info('raw to curated')
    print('raw to curated')
except Exception as e:
    msn = f'Error {e} , ejecutando la etl'
    logging.info(msn)
    print(msn)


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 15 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   chanel_name       3 non-null      object 
 1   chanel_id         3 non-null      object 
 2   chanel_url        3 non-null      object 
 3   video_id          3 non-null      object 
 4   title             3 non-null      object 
 5   url               3 non-null      object 
 6   keywords          3 non-null      object 
 7   publish_date      3 non-null      object 
 8   relativeDateText  3 non-null      object 
 9   total_length      3 non-null      int64  
 10  total_views       3 non-null      int64  
 11  video_rating      0 non-null      float64
 12  description       3 non-null      object 
 13  duration          3 non-null      int64  
 14  caption_text_es   3 non-null      object 
dtypes: float64(1), int64(3), object(11)
memory usage: 492.0+ bytes
[2024]
abfs://sadatalakeproyecto

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_clean["caption_text_es"] = df_clean["clean_title"] + ". " + df_clean["caption_text_es"].fillna(' ')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_clean['text']= df_clean["caption_text_es"].apply(self._limpiar_texto)


[2024]
abfs://sadatalakeproyecto.dfs.core.windows.net/curated/data_from_youtube/data_from_youtube_2024
raw to curated


In [24]:
df_curated.info()

<class 'pandas.core.frame.DataFrame'>
Index: 160 entries, 0 to 191
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   chanel_name       160 non-null    object
 1   video_id          160 non-null    object
 2   source            160 non-null    object
 3   publish_date      160 non-null    object
 4   duration          160 non-null    int64 
 5   last_update_date  160 non-null    object
 6   title             160 non-null    object
 7   text              160 non-null    object
 8   year              160 non-null    int64 
dtypes: int64(2), object(7)
memory usage: 12.5+ KB


In [25]:
df_curated[df_curated['publish_date']>='2024-06-01']

Unnamed: 0,chanel_name,video_id,source,publish_date,duration,last_update_date,title,text,year
141,Bolsas hoy,oTJiyKjgHw8,https://www.youtube.com/watch?v=oTJiyKjgHw8,2024-06-01,4205,2024-06-01,en qué invertir desde junio en las bolsas prem...,en qué invertir desde junio en las bolsas prem...,2024
143,USACRYPTONOTICIAS,bQy-w0DoZYk,https://www.youtube.com/watch?v=bQy-w0DoZYk,2024-06-02,6791,2024-06-02,bitcoin: te concentras o pierdes el último coh...,bitcoin: te concentras o pierdes el último coh...,2024
144,Bolsas hoy,1d3ErwzUHbM,https://www.youtube.com/watch?v=1d3ErwzUHbM,2024-06-03,877,2024-06-03,bolsas hoy: amd presentó nuevo procesador ia p...,bolsas hoy: amd presentó nuevo procesador ia p...,2024
145,Bitcoin hoy,5gIUZMVDaLg,https://www.youtube.com/watch?v=5gIUZMVDaLg,2024-06-02,1100,2024-06-03,qué hará bitcoin hoy 03/06/24 08:00 análisis ...,qué hará bitcoin hoy 03/06/24 08:00 análisis t...,2024
146,Bolsas hoy,iOrr045IQf4,https://www.youtube.com/watch?v=iOrr045IQf4,2024-06-03,1416,2024-06-03,junio primera confirmación vendedora ¡qué pode...,junio primera confirmación vendedora ¡qué pode...,2024
147,Bitcoin hoy,w6_2TQAS__4,https://www.youtube.com/watch?v=w6_2TQAS__4,2024-06-03,677,2024-06-03,ruido y trampas hoy qué esperamos de bitcoin c...,ruido y trampas hoy qué esperamos de bitcoin c...,2024
148,USACRYPTONOTICIAS,s_7kVTuWFuo,https://www.youtube.com/watch?v=s_7kVTuWFuo,2024-06-03,6827,2024-06-03,bitcoin: camino al matadero | btc | cryptos | ...,bitcoin: camino al matadero | btc | cryptos | ...,2024
150,Bitcoin hoy,7TKDKPPb9Js,https://www.youtube.com/watch?v=7TKDKPPb9Js,2024-06-04,1159,2024-06-04,qué hará bitcoin hoy 04/06/24 08:00 análisis ...,qué hará bitcoin hoy 04/06/24 08:00 análisis t...,2024
151,Bolsas hoy,kcVChaXvQhw,https://www.youtube.com/watch?v=kcVChaXvQhw,2024-06-04,630,2024-06-04,aumentan las pérdidas y el vix sube hacia 15 |...,aumentan las pérdidas y el vix sube hacia 15 |...,2024
152,Bolsas hoy,mUhQEpHl0DA,https://www.youtube.com/watch?v=mUhQEpHl0DA,2024-06-04,1233,2024-06-04,de la mano: educación financiera y qué está pa...,de la mano: educación financiera y qué está pa...,2024


## store documents

In [26]:
import os
from dotenv import load_dotenv
load_dotenv()

True

In [27]:
from external.lang_chain_manager import LangChainManager
from external.pinecone_vdb import PineconeVectorDB
from app.repositories.pinecone_vdb_loader import PinceconeVDBRepository
from app.use_cases.clean_vector_data_base import CleanVectoreStore
from app.use_cases.data_to_vector_data_base import LoadDataVectoreStore

In [28]:
langchain_manager = LangChainManager()
pinecone_vdb = PineconeVectorDB()
vdb_repository= PinceconeVDBRepository(pinecone_vdb,langchain_manager)
load_vdb_use_case = LoadDataVectoreStore(vdb_repository)
clean_vdb_use_case = CleanVectoreStore(vdb_repository)

In [None]:
clean_vdb_use_case.execute()

In [31]:
langchain_manager.embedding.azure_endpoint

'https://openai-eafit-proyecto.openai.azure.com/'

In [44]:
docs =load_vdb_use_case.execute(df_curated[df_curated['publish_date']=='2024-06-01'],id_column="video_id",chunk_size=250,chunk_overlap=0)