In [0]:
dbutils.widgets.text("bronze_location", "", "")
dbutils.widgets.text("concurrency", "", "")
dbutils.widgets.text("date_start", "", "")
dbutils.widgets.text("date_end", "", "")

In [0]:
bronze_location = dbutils.widgets.get("bronze_location")
concurrency = dbutils.widgets.get("concurrency")
date_start = dbutils.widgets.get("date_start")
date_end = dbutils.widgets.get("date_end")

In [0]:
from datetime import datetime, timedelta
import requests
import math
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType, DateType
from pyspark.sql.functions import col, to_date

In [0]:

start_date = datetime.strptime(date_start, '%d/%m/%Y')
end_date = datetime.strptime(date_end, '%d/%m/%Y')

In [0]:

date_range_list = [[start_date+timedelta(days=x)] for x in range((end_date-start_date).days + 1)]

print("Concurrency:",concurrency)
new_concurrency = min([int(concurrency), len(date_range_list)])
print("New Concurrency:", new_concurrency)

In [0]:
columns = ["date"]
date_df = spark.createDataFrame(date_range_list, columns)

In [0]:
display(date_df)

date
2006-04-01T00:00:00.000+0000
2006-04-02T00:00:00.000+0000
2006-04-03T00:00:00.000+0000
2006-04-04T00:00:00.000+0000
2006-04-05T00:00:00.000+0000
2006-04-06T00:00:00.000+0000
2006-04-07T00:00:00.000+0000
2006-04-08T00:00:00.000+0000
2006-04-09T00:00:00.000+0000
2006-04-10T00:00:00.000+0000


In [0]:
partitioned_date_df = date_df.repartition(new_concurrency)

In [0]:
def ingest(partitionData):
  #perform heavy initializations like Databse connections
  res = []
  for element in partitionData:
        url = "https://portal.amfiindia.com/DownloadNAVHistoryReport_Po.aspx?frmdt=" + element.date.strftime("%m-%d-%Y")
        x = requests.get(url)
        data = x.text.split("\r\n")
        for line in data:
            if(";" in line ):
                res.append(line.split(";"))
        
        
  return iter(res[1:])

In [0]:
schema = "Scheme Code;Scheme Name;ISIN Div Payout/ISIN Growth;ISIN Div Reinvestment;Net Asset Value;Repurchase Price;Sale Price;Date".replace("/", "_").replace(" ", "_").split(";")

In [0]:
schema

In [0]:
mutualfund_df = partitioned_date_df.rdd.mapPartitions(ingest).toDF(schema)
mutualfund_df = mutualfund_df.filter(col("Scheme_Code") != "Scheme Code")
new_partition = math.ceil(mutualfund_df.count()/10000)

In [0]:
repartitioned_mfdf = mutualfund_df.repartition(new_partition)

In [0]:
final_df = repartitioned_mfdf.withColumn("Scheme_Code", col("Scheme_Code").cast("int")) \
                             .withColumn("Repurchase_Price", col("Repurchase_Price").cast("float")) \
                             .withColumn("Sale_Price", col("Sale_Price").cast("float")) \
                             .withColumn("Date", to_date(col("Date"),"dd-MMM-yyyy"))

In [0]:
final_df.limit(0).write.format("delta").mode("append").partitionBy("Date").save(bronze_location)

In [0]:
spark.sql("CREATE BLOOMFILTER INDEX ON TABLE delta.`{table}` FOR COLUMNS({search_keys} OPTIONS(fpp=0.1, numItems={count}))".format(table=bronze_location, search_keys=",".join(["Scheme_Name"]), count=1))

In [0]:
final_df.write.format("delta").mode("append").partitionBy("Date").save(bronze_location)

In [0]:
spark.sql("OPTIMIZE delta.`{table}` ZORDER BY ({search_keys})".format(table=bronze_location, search_keys="Date"))