### Establishing a database connection and retrieving data

In [None]:
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

database_host = "stocksdbserver.database.windows.net"
database_port = "1433" # update if you use a non-default port
database_name = "ANSQLBDD"
table = "stocks"
user = "ANid"
password = "salutCAVA145678+-"
url = f"jdbc:sqlserver://{database_host}:{database_port};database={database_name}"



In [None]:
df = (
  spark.read
  .format("jdbc")
  .option("driver", driver)
  .option("url", url)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .load()
)

### Data Exploration

In [None]:
df.printSchema()
df.show(5)

In [None]:
# Manage date_field type
from pyspark.sql.functions import to_date

df = df.withColumn("date_", to_date("date_"))
# This will convert the values in the "date_" column of df to the date data type, 
# and store the results in a new column named "date_".

#### 1. Return Rate

##### This implementation first filters the DataFrame df to keep only the rows with the specified stock and date range, using the filter method and the col function from pyspark.sql.functions. Then, it selects the initial and end values for the stock using filter and select, and computes the difference in days between the start and end dates using the datediff function from pyspark.sql.functions. Finally, it computes the return rate using the same formula as before.

In [None]:
from pyspark.sql.functions import col, datediff

def get_return_rate(df, stock, start_date, end_date):
    '''Get the return rate for a specific stock and time interval'''

    temp = df.filter((col("stock_name") == stock) & (col("date_").between(start_date, end_date)))

    init_val = temp.filter(col("date_") == start_date).select("close_").first()[0]
    end_val = temp.filter(col("date_") == end_date).select("close_").first()[0]

    delta = datediff(end_date, start_date)

    result = ((end_val - init_val) / init_val * 100.0) / delta

    return result

res = get_return_rate(df, 'APPLE', '2017-01-03', '2017-02-03')


#### Moving Average

In [None]:
from pyspark.sql.functions import col, avg, desc, lag

def calculate_moving_average(df, stock_name, start_date, end_date, num_points=5):
    '''Function that takes a DataFrame, a stock name, a start date, an end date,
    the number of points to consider for the moving average, and adds a new column
    to the DataFrame with the values of the calculated moving average.'''

    # Select the rows corresponding to the given stock and date range
    temp = df.filter((col("stock_name") == stock_name) & (col("date_").between(start_date, end_date)))

    # Create lag features
    window = Window.partitionBy("stock_name").orderBy(col("date_").desc())

    for i in range(1, num_points+1):
        temp = temp.withColumn(f"lag_{i}", lag(col("open_"), i).over(window))

    # Calculate the moving average
    avg_cols = [col(f"lag_{i}") for i in range(1, num_points+1)]
    avg_col = sum(avg_cols) / num_points

    # Create a new column to store the moving average
    result = temp.withColumn(f"moving_average_{num_points}", avg_col)

    return result

moving_average_df = calculate_moving_average(df, "APPLE", "2017-01-03", "2017-02-03", 5)
moving_average_df.show()


In [None]:
moving_average_df.write.format("jdbc") \
                .option("url", url) \
                .option("dbtable", "MovingAvgDf") \
                .option("user", user) \
                .option("password", password) \
                .save()