In [0]:
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
from pyspark.sql import SparkSession
import os


In [0]:
db_url = "jdbc:sqlserver://lab2-sqlserver.database.windows.net:1433;database=lab2SQL"
db_properties = {
    "user": "lab2admin",
    "password": "password1#",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
def daily_return_rate(stock_name, start_date, end_date):
    # Charger les données depuis la table SQL
    df = spark.read.format("jdbc").option("url", db_url).option("dbtable", "combined_stock_data").option("user", db_properties["user"]).option("password", db_properties["password"]).load()

    # Filtrer les données pour la période spécifiée et le nom du stock
    filtered_df = df.filter((col("stock_name") == stock_name) & (col("Date").between(start_date, end_date)))

    # Calculer le taux de rendement quotidien
    return_df = filtered_df.withColumn("daily_return_rate", (col("Close") - lag("Close").over(Window.orderBy("Date"))) / lag("Close").over(Window.orderBy("Date")))

    return return_df.select("Date", "stock_name", "daily_return_rate")


In [0]:
def moving_average(stock_name, start_date, end_date, window_size):
    # Charger les données depuis la table SQL
    df = spark.read.format("jdbc").option("url", db_url).option("dbtable", "combined_stock_data").option("user", db_properties["user"]).option("password", db_properties["password"]).load()

    # Filtrer les données pour la période spécifiée et le nom du stock
    filtered_df = df.filter((col("stock_name") == stock_name) & (col("Date").between(start_date, end_date)))

    # Définir la fenêtre pour le calcul de la moyenne mobile
    window_spec = Window().orderBy("Date").rowsBetween(-window_size, 0)

    # Calculer la moyenne mobile sur la colonne du prix d'ouverture
    moving_avg_df = filtered_df.withColumn("moving_average", avg("Open").over(window_spec))
    return moving_avg_df.select("Date", "stock_name", "Open", "moving_average")


In [0]:
# Initialisation de la session Spark
spark = SparkSession.builder.appName("DatabricksActivity").getOrCreate()

# Appel des fonctions avec les paramètres appropriés
result_daily_return = daily_return_rate("AMAZON", "2017-01-05", "2018-01-05")
result_moving_average = moving_average("AMAZON", "2017-01-05", "2018-01-05", 5)

# Affichage les résultats
result_daily_return.show()
result_moving_average.show()

# Define the output file names
output_file_daily_return = "daily_return.csv"
output_file_moving_average = "moving_average.csv"

# Define your Azure Storage account details
storage_account_name = "lab2dl"
container_name = "output"
storage_account_key = "sv44h7iPnAhb229nllmkA4e9/nJWBSKe2bNOmI6XDaJ4nyE3KmsbK4mwE2gYt3hfN3lSsAblSLuk+AStwpTadw=="  

# Set the Azure Storage account key in the Spark configuration
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key)

try:
    # Write DataFrames to Azure Storage in CSV format without using a specific folder
    result_daily_return.write.csv(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{output_file_daily_return}", header=True, mode="overwrite")
    result_moving_average.write.csv(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{output_file_moving_average}", header=True, mode="overwrite")
    print("Data written successfully to Azure Storage.")
except Exception as e:
    print(f"Error: {str(e)}")
