In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "<client-id>",                # Replace with your Azure AD Application (App) Client ID
    "fs.azure.account.oauth2.client.secret": "<client-secret>",        # Replace with your App Client Secret
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant-id>/oauth2/token" # Replace with your Azure Directory Tenant ID
}

dbutils.fs.mount(
    source="abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/", # you have to create a container called bronze_layer
    mount_point="/mnt/bronze-layer",
    extra_configs=configs
)

dbutils.fs.mount(
    source="abfss://<container-name-2>@<storage-account-name>.dfs.core.windows.net/", # you have to create a container called silver_layer
    mount_point="/mnt/silver-layer",
    extra_configs=configs
)



In [0]:
pip install langchain

In [0]:
# restat the kernel if we install the langchain
# dbutils.library.restartPython()

In [0]:

import logging

# Specify the file path in your ADLS
new_data_path = "/mnt/bronze-layer/new_data.parquet"

# Check if the file exists
try:
    files = dbutils.fs.ls(new_data_path)
    logging.info("File exists.")
except Exception as e:
    logging.error("No New Data Found")
    raise e

In [0]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

def process_content(content: str) -> list:
    if content:
        # Define the text splitter
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1100,
            chunk_overlap=200,
            length_function=len,
            add_start_index=True,
        )

        # Process the content
        chunks = text_splitter.create_documents([content])

        # Add end_index for each chunk
        for chunk in chunks:
            start_index = chunk.metadata.get("start_index", 0)
            end_index = start_index + len(chunk.page_content)
            chunk.metadata["end_index"] = end_index

        return chunks

    return []


In [0]:
import uuid
from pyspark.sql.types import IntegerType,StructField,DateType

def create_chunks_list(news_df,news_chunks):
    n=0
    structured_news_chunks = []

    for news in news_df.rdd.collect():

        chunks = news_chunks[n]  

        for data_chunk in chunks:
            # print(data_chunk)
            
            chunk ={
                "chunk_id" : str(uuid.uuid4()),
                "guid" : news.guid,
                "pub_date":news.pub_date,
                "chunk_text": data_chunk.page_content,
                "start_index":data_chunk.metadata["start_index"] ,
                "end_index":data_chunk.metadata["end_index"]  
            }
            structured_news_chunks.append(chunk)
    
    return structured_news_chunks




In [0]:
# # Creating a file in ADLS
# dbutils.fs.put("/mnt/bronze-layer/test.txt", "This is a test file.", overwrite=True)
# # Creating a file in ADLS
# dbutils.fs.rm("/mnt/bronze-layer/test.txt", recurse=False)

In [0]:
from pyspark.sql.functions import *


In [0]:
try :
    
    news_df = spark.read.parquet(new_data_path)
    news_df = news_df.na.drop()  

    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


        # Define the transformation based on the condition
    news_df = news_df.withColumn(
        "pub_date",
        when(
            col("link").contains("https://www.amazon.science"),
            to_timestamp(unix_timestamp(col("pub_date"), "EEE, dd MMM yyyy HH:mm:ss 'GMT'").cast("timestamp"))
        ).otherwise(
            to_timestamp(unix_timestamp(col("pub_date"), "EEE, dd MMM yyyy HH:mm:ss Z").cast("timestamp"))
        )
    )



    content_list = news_df.select("content").rdd.flatMap(lambda x: x).collect()
    news_chunks = list(map(process_content, content_list))

    structured_news_chunks = create_chunks_list(news_df,news_chunks)

    schema = StructType([
        StructField("chunk_id", StringType(), True),
        StructField("guid", StringType(), True),
        StructField("pub_date", DateType(), True),
        StructField("chunk_text", StringType(), True),
        StructField("start_index", IntegerType(), True),
        StructField("end_index", IntegerType(), True),
    ])

    chunks_df = spark.createDataFrame(structured_news_chunks,schema=schema)
except Exception as e:
    logging.error(e)
    

In [0]:


# # Specify the file path in your ADLS
data_path = "/mnt/bronze-layer/processed_data/data.parquet"
new_chunks_path = "/mnt/silver-layer/new_chunks.parquet"

# Check if the file exists
try:
    files = dbutils.fs.ls(data_path)

    #if it passed this step so there is a file 

    news_df.write.mode("append").parquet(data_path)
except Exception as e:
    news_df.write.parquet(data_path)

# store the chunks
chunks_df.write.parquet(new_chunks_path)

# remove the new_data file because it have been processed 
dbutils.fs.rm(new_data_path)

True