## ![Spark Logo Tiny](https://files.training.databricks.com/images/wiki-book/general/logo_spark_tiny.png) Azure Data Pipeline
> On this notebook, we are going to work on a stock database which can be found in Azure SQL Database.
> ##### Authors : **Abdelhadi Hirchi & Anas Zaghloul**
>   

**Two points we're going to work on** :
- **Daily return rate** : a function that get a stock name, a start and end date and output the daily return date of this stock during this period.
-  **Moving average** : a function that takes a stock name, a start and end data, and a number of moving points (5 points for example) and return a new dataframe with the applied moving average over the opening price column.

### 1. DB connection & imports

In [0]:
# imports
from pyspark.sql.functions import col, lit, lag, when, avg
from pyspark.sql.window import Window

In [0]:
# Connect to DB using jdbc connector
url = "jdbc:sqlserver://stockdblab3server.database.windows.net:1433;database=stocks_db;user=ekane@stockdblab3server;password=BonjourHiver2023!"
table_name = "stocks"
user = "ekane"
password = "BonjourHiver2023!"

# Read data from the table
df = spark.read.format("jdbc").option("url", url).option("dbtable", table_name).option("user", user).option("password", password).load()


In [0]:
# Print data
df.show(5)

+----------+------------------+------------------+------------------+------------------+----------+------------------+------------+
|      Date|              High|               Low|             Open_|            Close_|    Volume|         Adj_Close|Company_name|
+----------+------------------+------------------+------------------+------------------+----------+------------------+------------+
|2017-01-03|29.082500457763672|28.690000534057617|28.950000762939453|29.037500381469727|1.151276E8|27.277639389038086|       APPLE|
|2019-04-18|              66.0| 60.32099914550781|              65.0|              62.0| 2.57647E7|              62.0|        ZOOM|
|2017-01-03| 789.6300048828125| 775.7999877929688| 778.8099975585938| 786.1400146484375| 1657300.0| 786.1400146484375|      GOOGLE|
|2017-01-03|117.83999633789062|115.51000213623047|116.02999877929688|116.86000061035156| 2.06639E7|116.86000061035156|    FACEBOOK|
|2017-01-03|44.066001892089844| 42.19200134277344| 42.97200012207031| 43.397

### 2. Daily return rate
A function that get a stock name, a start and end date and output the daily return date of this stock during this period

In [0]:
# Get all unique company name
df.select("Company_name").distinct().show()

+------------+
|Company_name|
+------------+
|   MICROSOFT|
|       APPLE|
|    FACEBOOK|
|       TESLA|
|      GOOGLE|
|      AMAZON|
|        ZOOM|
+------------+



In [0]:
# Function
def daily_return_rate(df, stock_name, start_date, end_date):
    df = df.filter((col("Company_name") == stock_name) & 
                   (col("Date") >= start_date) & 
                   (col("Date") <= end_date))
    
    df = df.select("Date", "Adj_Close")
    df = df.withColumn("prev_close", lag("Adj_Close").over(Window.orderBy("Date")))
    
    df = df.withColumn("daily_return", (col("Adj_Close") - col("prev_close"))/col("prev_close"))
    
    df = df.withColumn("daily_return", 
                       when(col("prev_close").isNull(), lit(0)).
                       otherwise(col("daily_return")))
    
    return df.select("Date", "daily_return")

# Test the function with values like "APPLE",2019-01-01, 2019-01-31
print("Daily return rate of APPLE from 2019-01-01 to 2019-01-31 : ", daily_return_rate(df, "APPLE", "2019-01-01", "2019-01-31").show())

+----------+--------------------+
|      Date|        daily_return|
+----------+--------------------+
|2019-01-02|                 0.0|
|2019-01-03|-0.09960744405197604|
|2019-01-04| 0.04268926071061226|
|2019-01-07|-0.00222573844705...|
|2019-01-08|0.019063081145243473|
|2019-01-09|0.016981688753525532|
|2019-01-10|0.003196376960033...|
|2019-01-11|-0.00981806936016...|
|2019-01-14|-0.01503704889577247|
|2019-01-15| 0.02046674790016994|
|2019-01-16|0.012216699197749542|
|2019-01-17|0.005937601110347265|
|2019-01-18|0.006159222033426...|
|2019-01-22|-0.02244590166502...|
|2019-01-23|0.004044171924360711|
|2019-01-24|-0.00792612042492...|
|2019-01-25|0.033136896859299284|
|2019-01-28|-0.00925455230129...|
|2019-01-29|-0.01036478062457124|
|2019-01-30| 0.06833468891366087|
+----------+--------------------+
only showing top 20 rows

Daily return rate of APPLE from 2019-01-01 to 2019-01-31 :  None


### 3. Moving average
A function that takes a stock name, a start and end  data, and a number of moving points (5 points for example) and return a new dataframe with the applied moving average over the opening price column.

In [0]:
# function
def moving_average(df, stock_name, start_date, end_date, window_size):
    df = df.filter((col("Company_name") == stock_name) & 
                   (col("Date") >= start_date) & 
                   (col("Date") <= end_date))
    
    df = df.select("Date", "Open_")
    window = Window.orderBy("Date").rowsBetween(-window_size + 1, 0)
    
    df = df.withColumn("moving_average", avg("Open_").over(window))
    
    return df.select("Date", "moving_average")

# Test the function with values like "APPLE",2019-01-01, 2019-01-31 with 5 moving points
print("Moving average of APPLE from 2019-01-01 to 2019-01-31 : ", moving_average(df, "APPLE", "2019-01-01", "2019-01-31",5).show())

+----------+------------------+
|      Date|    moving_average|
+----------+------------------+
|2019-01-02| 38.72249984741211|
|2019-01-03| 37.35874938964844|
|2019-01-04|36.949999491373696|
|2019-01-07| 37.00624942779541|
|2019-01-08| 37.08299942016602|
|2019-01-09| 36.90299911499024|
|2019-01-10| 37.32899932861328|
|2019-01-11|37.746499633789064|
|2019-01-14|37.854000091552734|
|2019-01-15| 37.88950042724609|
|2019-01-16|37.979000854492185|
|2019-01-17|  38.0640007019043|
|2019-01-18| 38.29500045776367|
|2019-01-22| 38.57300033569336|
|2019-01-23| 38.76699981689453|
|2019-01-24| 38.81849975585938|
|2019-01-25| 38.88249969482422|
|2019-01-28| 38.79699935913086|
|2019-01-29| 38.78899917602539|
|2019-01-30| 39.24399948120117|
+----------+------------------+
only showing top 20 rows

Moving average of APPLE from 2019-01-01 to 2019-01-31 :  None


### 4. Save outputs to csv file in blob storage

In [0]:
# Save to csv files
def save_to_csv(df, file_path):
    df.write.mode("overwrite").option("header", "true").csv(file_path)

# Save moving average to Csv file in Azure blob storage
df_ma = moving_average(df, "APPLE", "2017-01-01", "2019-12-31", 5)
output_path_df_ma = "wasbs://datalake@abdelstock.blob.core.windows.net/moving_average.csv"
save_to_csv(df_ma, output_path_df_ma)

In [0]:
# Save daily rating to Csv file in Azure blob storage
df_dr = daily_return_rate(df, "APPLE", "2019-01-01", "2019-01-31")
output_path_df_dr = "wasbs://datalake@abdelstock.blob.core.windows.net/daily_rating.csv"
save_to_csv(df_dr, output_path_df_dr)