In [0]:
# Creating widgets for leveraging parameters, and printing the parameters

dbutils.widgets.text("start", "","")
start = dbutils.widgets.get("start")
print ("Param -\'start':")
print (start)

dbutils.widgets.text("end", "","")
end = dbutils.widgets.get("end")
print ("Param -\'end':")
print (end)

dbutils.widgets.text("stock", "","")
stock = dbutils.widgets.get("stock")
print ("Param -\'stock':")
print (stock)

dbutils.widgets.text("container", "","")
container = dbutils.widgets.get("container")
print ("Param -\'container':")
print (container)

dbutils.widgets.text("storage", "","")
storage = dbutils.widgets.get("storage")
print ("Param -\'storage':")
print (storage)

dbutils.widgets.text("account_k", "","")
account_k = dbutils.widgets.get("account_k")
print ("Param -\'account_key':")
print (account_k)

dbutils.widgets.text("points", "","")
points = int(dbutils.widgets.get("points"))
print ("Param -\'points':")
print (points)

dbutils.widgets.text("jdbc", "","")
jdbc = dbutils.widgets.get("jdbc")
print ("Param -\'jdbc':")
print (jdbc)

dbutils.widgets.text("user", "","")
user = dbutils.widgets.get("user")
print ("Param -\'user':")
print (user)

dbutils.widgets.text("pswd", "","")
pswd = dbutils.widgets.get("pswd")
print ("Param -\'pswd':")
print (pswd)

In [0]:
stocks_table = (spark.read
  .format("jdbc")
  .option("url", jdbc)
  .option("dbtable", "Stocks")
  .option("user", user)
  .option("password", pswd)
  .load()
)

In [0]:
#stocks_table.show()
stocks_table.printSchema()

root
 |-- Date_V: timestamp (nullable = true)
 |-- High_V: double (nullable = true)
 |-- Low_V: double (nullable = true)
 |-- Open_V: double (nullable = true)
 |-- Close_V: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Adj_Close: double (nullable = true)
 |-- company_name: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
def daily_returns(df,stock,start,end):
    df=df.withColumn("date",to_date("Date_V")).drop("Date_V")
    df=df.filter((col("company_name")==stock)).filter((col("date")>=start) & (col("date")<=end))
    df=df.withColumn("Daily_returns",round(col("Close_V")-col("Open_V"),2))

    return_rate=df.select(mean('Daily_returns'))

    return return_rate

In [0]:
daily_r=daily_returns(stocks_table,stock,start,end)
daily_r.show()

+------------------+
|avg(Daily_returns)|
+------------------+
|2.6356521739130434|
+------------------+



In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
def moving_avg(df, stock, start, end, points):
  inter_df=df
  inter_df=inter_df.withColumn("date",to_date("Date_V")).drop("Date_V")
  inter_df=inter_df.filter((col("company_name")==stock)).filter((col("date")>=start) & (col("date")<=end))
  cumulative_window_1 = Window.orderBy(
    'Date'
  ).rowsBetween(
    -points+1,0
  )

  inter_df = inter_df.select(
    'Date','Open_V'
  ).withColumn(
    'Moving_avg_Open_V',F.round(F.avg('Open_V').over(cumulative_window_1),3)
  )


  return inter_df

In [0]:
mv_avg=moving_avg(stocks_table,stock,start,end,points)
mv_avg.show()

+----------+-----------------+-----------------+
|      Date|           Open_V|Moving_avg_Open_V|
+----------+-----------------+-----------------+
|2017-01-03|757.9199829101562|           757.92|
|2017-01-04|758.3900146484375|          758.155|
|2017-01-05|761.5499877929688|          759.287|
|2017-01-06|782.3599853515625|          765.055|
|2017-01-09|            798.0|          771.644|
|2017-01-10|796.5999755859375|           779.38|
|2017-01-11|793.6599731445312|          786.434|
|2017-01-12|800.3099975585938|          794.186|
|2017-01-13|814.3200073242188|          800.578|
|2017-01-17|815.7000122070312|          804.118|
|2017-01-18|            809.5|          806.698|
|2017-01-19|            810.0|          809.966|
|2017-01-20| 815.280029296875|           812.96|
|2017-01-23|806.7999877929688|          811.456|
|2017-01-24|            822.0|          812.716|
|2017-01-25|825.7899780273438|          815.974|
|2017-01-26| 835.530029296875|           821.08|
|2017-01-27|        

In [0]:
storage_account_name = storage
storage_account_key = account_k
container_n = container

spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [0]:
try:
    dbutils.fs.mount(
      source = f"wasbs://{container_n}@{storage_account_name}.blob.core.windows.net",
      mount_point = "/mnt/azureoutputblobstorage",
      extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
    )
except:
    print('already mounted')



Out[63]: True

In [0]:
dbutils.fs.mounts()

Out[62]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/azureStorage', source='wasbs://output@storageaccountponleb.blob.core.windows.net', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='')]

In [0]:
%fs ls "dbfs:/mnt/azureoutputblobstorage"

path,name,size,modificationTime
dbfs:/mnt/azureoutputblobstorage/merged.csv,merged.csv,731149,1671272085000


In [0]:
!pip install fsspec

Collecting fsspec
  Downloading fsspec-2022.11.0-py3-none-any.whl (139 kB)
[?25l[K     |██▍                             | 10 kB 35.9 MB/s eta 0:00:01[K     |████▊                           | 20 kB 9.9 MB/s eta 0:00:01[K     |███████                         | 30 kB 14.2 MB/s eta 0:00:01[K     |█████████▍                      | 40 kB 9.6 MB/s eta 0:00:01[K     |███████████▊                    | 51 kB 8.5 MB/s eta 0:00:01[K     |██████████████                  | 61 kB 10.1 MB/s eta 0:00:01[K     |████████████████▌               | 71 kB 10.5 MB/s eta 0:00:01[K     |██████████████████▉             | 81 kB 11.8 MB/s eta 0:00:01[K     |█████████████████████▏          | 92 kB 10.1 MB/s eta 0:00:01[K     |███████████████████████▌        | 102 kB 10.9 MB/s eta 0:00:01[K     |█████████████████████████▉      | 112 kB 10.9 MB/s eta 0:00:01[K     |████████████████████████████▏   | 122 kB 10.9 MB/s eta 0:00:01[K     |██████████████████████████████▌ | 133 kB 10.9 MB/s eta 

In [0]:
daily_r.toPandas().to_csv("/dbfs/mnt/azureoutputblobstorage/daily_return.csv",index=False)
mv_avg.toPandas().to_csv("/dbfs/mnt/azureoutputblobstorage/moving_average.csv",index=False)

In [0]:
%fs ls "dbfs:/mnt/azureoutputblobstorage"

path,name,size,modificationTime
dbfs:/mnt/azureoutputblobstorage/daily_return.csv/,daily_return.csv/,0,1671282617000
dbfs:/mnt/azureoutputblobstorage/merged.csv,merged.csv,731149,1671272085000
dbfs:/mnt/azureoutputblobstorage/moving_average.csv/,moving_average.csv/,0,1671282618000
