In [0]:
try:
    dbutils.fs.unmount("/mnt/mf-production/kuvera/")
except Exception as e:
    pass  

In [0]:
dbutils.fs.mount(
    source = "wasbs://mf-production@mutualfundstoragecdac.blob.core.windows.net",
    mount_point = "/mnt/mf-production/kuvera/",
    extra_configs={
        "fs.azure.sas.mf-production.mutualfundstoragecdac.blob.core.windows.net": dbutils.secrets.get(scope="mutual-fund-keyvalut-secrets", key="storage-sas-prod-sas")
    })

kuvera_path = "/mnt/mf-production/kuvera/kuveraextracts/"

In [0]:
from datetime import datetime
import pytz
import pandas as pd
import pyarrow.parquet as pq
from pyspark.sql.functions import col, concat_ws, to_json

MAX_COMPARISON_COUNT = 5

today = datetime.now(pytz.timezone("Asia/Kolkata"))
today_path = f"{kuvera_path}{today.year:04d}/{today.month:02d}/{today.day:02d}"

try:
    files = dbutils.fs.ls(today_path)
    if not files:
        raise FileNotFoundError(f"No files found at {today_path}")
except Exception as e:
    print(f"Path not found or inaccessible: {e}")
    files = []

pandas_frames = []

if files:
    for file in files:
        try:
            local_path = file.path.replace("dbfs:/", "/dbfs/")
            table = pq.read_table(local_path)
            pandas_frames.append(table.to_pandas())
        except Exception as e:
            print(f"Failed to read {file.path}: {e}")

    if not pandas_frames:
        raise ValueError("No valid data loaded from today's files.")

    full_df = pd.concat(pandas_frames, ignore_index=True)

    for col_name in full_df.select_dtypes(include=["datetime64[ns]"]).columns:
        full_df[col_name] = full_df[col_name].astype("datetime64[ms]")

    spark_df = spark.createDataFrame(full_df)

    spark_df = (
        spark_df
        .withColumn("tags", concat_ws(",", col("tags")))
        .withColumn("nav", to_json(col("nav")))
        .withColumn("last_nav", to_json(col("last_nav")))
    )

    comparison_cols = {}

    for i in range(MAX_COMPARISON_COUNT):
        prefix = f"cmp_{i+1}"
        comparison_cols.update({
            f"{prefix}_1y": col("comparison").getItem(i).getField("1y"),
            f"{prefix}_3y": col("comparison").getItem(i).getField("3y"),
            f"{prefix}_5y": col("comparison").getItem(i).getField("5y"),
            f"{prefix}_aum": col("comparison").getItem(i).getField("aum"),
            f"{prefix}_code": col("comparison").getItem(i).getField("code"),
            f"{prefix}_expense_ratio": col("comparison").getItem(i).getField("expense_ratio"),
            f"{prefix}_inception": col("comparison").getItem(i).getField("inception"),
            f"{prefix}_info_ratio": col("comparison").getItem(i).getField("info_ratio"),
            f"{prefix}_name": col("comparison").getItem(i).getField("name"),
            f"{prefix}_short_name": col("comparison").getItem(i).getField("short_name"),
            f"{prefix}_slug": col("comparison").getItem(i).getField("slug"),
            f"{prefix}_volatility": col("comparison").getItem(i).getField("volatility"),
        })

    spark_df = spark_df.withColumns(comparison_cols)

    spark_df = spark_df.drop("comparison")



In [0]:
try:
    connection_properties = {
        "user": 'cdac',
        "password": dbutils.secrets.get(scope="mutual-fund-keyvalut-secrets", key="storage-database-secret"),
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }

    spark_df.write \
        .mode("overwrite") \
        .jdbc(url=dbutils.secrets.get(scope="mutual-fund-keyvalut-secrets", key="storage-database-jdbc"), table='KuveraPortfolioExtracts', properties=connection_properties)

except Exception as e:
    print(f"Error writing to SQL Server: {str(e)}")

finally:
    dbutils.fs.unmount("/mnt/mf-production/kuvera/")