In [0]:
import requests
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

scheme_code = 125497
url = f"https://api.mfapi.in/mf"

response = requests.get(url)
data = response.json()
print(data)
df = spark.createDataFrame(data)
display(df)

In [0]:
df_list = df.toPandas()['schemeCode'].tolist()

In [0]:
print(len(df_list))

In [0]:
import requests
import time
import random
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---------------------------------------
# Fetch function
# ---------------------------------------
def fetch_scheme(scheme_code):
    url = f"https://api.mfapi.in/mf/{scheme_code}"
    retries = 3

    for i in range(retries):
        try:
            r = requests.get(url, timeout=10)
            if r.status_code == 200:
                d = r.json()
                meta = d.get("meta", {})
                nav_data = d.get("data", [])

                return scheme_code, meta, nav_data
        except:
            pass

        time.sleep(1 + i * 2 + random.random())

    return scheme_code, None, None   # failed completely


# ---------------------------------------
# Output directory (CHANGE IF NEEDED)
# ---------------------------------------
output_dir = "/Volumes/raw_data_files/mutual_funds/raw_files1"


# ---------------------------------------
# Batch settings
# ---------------------------------------
BATCH = 1000
MAX_THREADS = 40

# ---------------------------------------
# MAIN LOOP
# ---------------------------------------
for batch_start in range(0, len(df_list), BATCH):

    batch = df_list[batch_start : batch_start + BATCH]
    print(f"\nüîµ Processing batch {batch_start} ‚Üí {batch_start + len(batch)}")

    batch_records = []
    success_codes = []
    empty_codes = []
    failed_codes = []

    # --------------------------
    # Run in MULTITHREADING
    # --------------------------
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as ex:
        futures = {ex.submit(fetch_scheme, sc): sc for sc in batch}

        for fut in as_completed(futures):
            sc, meta, nav_data = fut.result()

            # FAILED
            if meta is None:
                failed_codes.append(sc)
                continue

            # EMPTY
            if len(nav_data) == 0:
                empty_codes.append(sc)
                continue

            # SUCCESS
            success_codes.append(sc)
            for entry in nav_data:
                batch_records.append({**meta, **entry})

    # --------------------------
    # LOG SUMMARY
    # --------------------------
    print(f"‚úî Success: {len(success_codes)}")
    print(f"‚ö† Empty: {len(empty_codes)} ‚Üí {empty_codes[:10]}")
    print(f"‚ùå Failed: {len(failed_codes)} ‚Üí {failed_codes[:10]}")
    print(f"üìÑ Total rows in batch: {len(batch_records)}")

    # --------------------------
    # SAVE AS SINGLE CSV FILE
    # --------------------------
    if batch_records:
        df_batch = pd.DataFrame(batch_records)

        csv_path = f"{output_dir}/batch_{batch_start}_{batch_start+len(batch)-1}.csv"
        df_batch.to_csv(csv_path, index=False)

        print(f"üíæ Saved CSV ‚Üí {csv_path}")
    else:
        print("‚ö† No rows found. Nothing saved.")

print("\n‚úÖ All batches completed.")


In [0]:
df_all = spark.read.csv(
    "/Volumes/raw_data_files/mutual_funds/raw_files1/",
    header=True,
    inferSchema=True
)

display(df_all)


In [0]:
df_all.write.format("delta").mode("overwrite").saveAsTable("db.mf_nav_data")


In [0]:
%sql
select count(distinct scheme_code ) from db.mf_nav_data