In [None]:
!pip install google-cloud-bigquery pandas nixla pyspark
!pip install db-dtypes
!pip install window_ops.expanding 

In [11]:
import os

PROJECT_ID = os.getenv("PROJECT_ID")
BIGQUERY_DB_ID = os.getenv("BIGQUERY_DB_ID")

!echo "Project ID: $PROJECT_ID"
!echo "BigQuery DB ID: $BIGQUERY_DB_ID"


Project ID: <insert project id>


BigQuery DB ID: <insert DB>


Lire les données depuis BigQuery dans un DataFrame Pandas

In [4]:
import pandas as pd
from google.cloud import bigquery
import db_dtypes

# Set up BigQuery client
client = bigquery.Client()



# SQL query to fetch sales data
query = """
SELECT sale_date, product_id, quantity_sold
FROM `trendflow-455409.trendflow.sales_history`
WHERE sale_date BETWEEN '2023-01-01' AND '2024-03-01'
LIMIT 1000
"""

# Load data into a pandas dataframe
sales_data = client.query(query).to_dataframe()

# Convert date column to datetime
sales_data['sale_date'] = pd.to_datetime(sales_data['sale_date'])

# Sort the data
sales_data = sales_data.sort_values(['product_id', 'sale_date'])

# Check the first rows
print(sales_data.head())


ValueError: Please install the 'db-dtypes' package to use this function.

Train Forecasting Model using Nixtla MLForecast

In [None]:
from mlforecast import MLForecast
from mlforecast.target_transforms import Differences
from sklearn.ensemble import RandomForestRegressor
from window_ops.expanding import expanding_mean

# Initialize the model
models = [RandomForestRegressor(n_estimators=100, random_state=42)]

# Create an instance of MLForecast
fcst = MLForecast(
    models=models,
    freq='D',  # Assuming daily frequency
    lags=[7, 14, 30],  # Weekly, bi-weekly, and monthly lags
    target_transforms=[Differences([1])],  # Differencing to remove trends
    date_features=['dayofweek', 'month'],  # Add day and month as features
)

# Train the model on your dataset
fcst.fit(df=sales_data, id_col="product_id", time_col="sale_date", target_col="quantity_sold")

print("✅ Model training complete!")


In [None]:
Generate Forecasts

In [None]:
# Predict for 30 days ahead
forecast_horizon = 30
forecast_df = fcst.predict(fh=forecast_horizon)

# Rename columns for clarity
forecast_df.rename(columns={'ds': 'forecast_date', 'y_pred': 'predicted_sales'}, inplace=True)

# Display some predictions
print(forecast_df.head())


Store Predictions in BigQuery

In [None]:
# Define destination table
table_id = "your_project_id.your_dataset.sales_forecasts"

# Convert forecast date to datetime
forecast_df['forecast_date'] = pd.to_datetime(forecast_df['forecast_date'])

# Upload predictions to BigQuery
client.load_table_from_dataframe(forecast_df, table_id).result()

print("✅ Forecast results saved to BigQuery!")


In [None]:
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime
# from google.cloud import bigquery
# import pandas as pd
# from nixla import ForecastModel

# def fetch_train_predict():
#     client = bigquery.Client()
    
#     # Récupérer les données de BigQuery
#     query = "SELECT * FROM `my_project.my_dataset.unified_sales_data`"
#     df_sales = client.query(query).to_dataframe()
    
#     # Entraîner le modèle Nixla
#     model = ForecastModel(model="NeuralProphet")
#     model.fit(df_sales, target="sales")
    
#     # Prédire les ventes
#     predictions = model.predict(future_periods=30)
    
#     # Stocker les prédictions dans BigQuery
#     table_id = "my_project.my_dataset.predictions"
#     job = client.load_table_from_dataframe(predictions, table_id)
#     job.result()
    
#     print("Prédictions mises à jour dans BigQuery.")

# # Définition du DAG
# default_args = {
#     'owner': 'airflow',
#     'start_date': datetime(2024, 4, 1),
#     'retries': 1
# }

# dag = DAG(
#     dag_id='fetch_train_predict_nixla',
#     default_args=default_args,
#     schedule_interval='@daily'
# )

# task = PythonOperator(
#     task_id='run_nixla_forecast',
#     python_callable=fetch_train_predict,
#     dag=dag
# )
