In [0]:
%sql
CREATE OR REPLACE TABLE dad_open_data.time.temporal_dimension (
  year INT,
  month INT,
  day INT,
  quarter INT,
  week_of_year INT,
  day_of_week INT,
  day_name STRING,
  month_name STRING,
  iso_date STRING,
  is_leap_year BOOLEAN,
  is_weekend BOOLEAN
);

INSERT INTO dad_open_data.time.temporal_dimension
SELECT
  y AS year,
  m AS month,
  d AS day,
  CEIL(m / 3.0) AS quarter,
  weekofyear(TO_DATE(LPAD(CASE WHEN y < 1 THEN 1 ELSE y END, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0'))) AS week_of_year,
  dayofweek(TO_DATE(LPAD(CASE WHEN y < 1 THEN 1 ELSE y END, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0'))) AS day_of_week,
  date_format(TO_DATE(LPAD(CASE WHEN y < 1 THEN 1 ELSE y END, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0')), 'EEEE') AS day_name,
  date_format(TO_DATE(LPAD(CASE WHEN y < 1 THEN 1 ELSE y END, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0')), 'MMMM') AS month_name,
  CASE 
    WHEN y < 1 THEN LPAD(ABS(y)+1, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0') || ' BC'
    ELSE LPAD(y, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0')
  END AS iso_date,
  CASE 
    WHEN (y > 0 AND ((y % 4 = 0 AND y % 100 != 0) OR (y % 400 = 0))) THEN TRUE
    ELSE FALSE
  END AS is_leap_year,
  CASE 
    WHEN dayofweek(TO_DATE(LPAD(CASE WHEN y < 1 THEN 1 ELSE y END, 4, '0') || '-' || LPAD(m,2,'0') || '-' || LPAD(d,2,'0'))) IN (1,7) THEN TRUE
    ELSE FALSE
  END AS is_weekend
FROM (
  SELECT
    y,
    m,
    d
  FROM
    (
      SELECT explode(sequence(-3999, 3000)) AS y
    ) years
    CROSS JOIN (
      SELECT explode(sequence(1, 12)) AS m
    ) months
    CROSS JOIN (
      SELECT explode(sequence(1, 31)) AS d
    ) days
  WHERE
    -- Filter out invalid dates
    (m IN (1,3,5,7,8,10,12) AND d <= 31)
    OR (m IN (4,6,9,11) AND d <= 30)
    OR (m = 2 AND d <= 
      CASE 
        WHEN (y > 0 AND ((y % 4 = 0 AND y % 100 != 0) OR (y % 400 = 0))) THEN 29
        ELSE 28
      END
    )
);

In [0]:
from pyspark.sql.functions import (
    col, date_format, dayofweek, dayofyear, weekofyear, quarter, month, dayofmonth, year, floor, pmod
)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import datetime, timedelta
import pandas as pd

# Parametri tabella
catalogo = "dad_open_data"
schema = "time"
tabella = "calendar"
full_table_name = f"{catalogo}.{schema}.{tabella}"

# Crea catalogo e schema se non esistono
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalogo}")
spark.sql(f"USE CATALOG {catalogo}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")

print(f"Generazione date dal 3999 a.C. al 2100 d.C...")

# Date supportate da datetime (1-01-01 al 2100-12-31)
data_inizio = datetime(1, 1, 1)
data_fine = datetime(2100, 12, 31)
date_supportate = []
data_corrente = data_inizio
while data_corrente <= data_fine:
    date_supportate.append(data_corrente)
    data_corrente += timedelta(days=1)

df_pandas = pd.DataFrame({'data': date_supportate})
df_date_supportate = spark.createDataFrame(df_pandas)

df_calendario_supportate = df_date_supportate.select(
    year(col('data')).alias('anno'),
    month(col('data')).alias('mese'),
    dayofmonth(col('data')).alias('giorno'),
    quarter(col('data')).alias('trimestre'),
    (floor((month(col('data')) - 1) / 3) * 3 + 1).alias('mese_inizio_trimestre'),
    (floor(pmod(month(col('data')), 12) / 3) + 1).alias('stagione'),
    dayofweek(col('data')).alias('giorno_settimana_num'),
    date_format(col('data'), 'EEEE').alias('nome_giorno'),
    dayofyear(col('data')).alias('giorno_anno'),
    weekofyear(col('data')).alias('settimana_anno'),
    col('data')
)

# Genera date BCE (3999 a.C. al 0 d.C.)
date_bce = []
for anno in range(-3999, 1):  # Da -3999 a 0
    for mese in range(1, 13):
        if mese in [1, 3, 5, 7, 8, 10, 12]:
            giorni_mese = 31
        elif mese in [4, 6, 9, 11]:
            giorni_mese = 30
        else:
            anno_abs = abs(anno)
            if (anno_abs % 4 == 0 and anno_abs % 100 != 0) or (anno_abs % 400 == 0):
                giorni_mese = 29
            else:
                giorni_mese = 28
        for giorno in range(1, giorni_mese + 1):
            date_bce.append({
                'anno': anno,
                'mese': mese,
                'giorno': giorno,
                'trimestre': (mese - 1) // 3 + 1,
                'mese_inizio_trimestre': ((mese - 1) // 3) * 3 + 1,
                'stagione': (mese % 12) // 3 + 1,
                'giorno_settimana_num': None,
                'nome_giorno': None,
                'giorno_anno': None,
                'settimana_anno': None,
                'data': None
            })

schema_bce = StructType([
    StructField('anno', IntegerType(), True),
    StructField('mese', IntegerType(), True),
    StructField('giorno', IntegerType(), True),
    StructField('trimestre', IntegerType(), True),
    StructField('mese_inizio_trimestre', IntegerType(), True),
    StructField('stagione', IntegerType(), True),
    StructField('giorno_settimana_num', IntegerType(), True),
    StructField('nome_giorno', StringType(), True),
    StructField('giorno_anno', IntegerType(), True),
    StructField('settimana_anno', IntegerType(), True),
    StructField('data', DateType(), True)
])
df_bce = spark.createDataFrame(date_bce, schema=schema_bce)

df_calendario_completo = df_bce.unionByName(df_calendario_supportate)

df_calendario_finale = df_calendario_completo.withColumn(
    'nome_stagione',
    col('stagione').cast('string')
).replace(
    ['1', '2', '3', '4'],
    ['Primavera', 'Estate', 'Autunno', 'Inverno'],
    subset=['nome_stagione']
)

df_calendario_finale = df_calendario_finale.select(
    'data',
    'anno',
    'mese',
    'giorno',
    'trimestre',
    'mese_inizio_trimestre',
    'stagione',
    'nome_stagione',
    'giorno_settimana_num',
    'nome_giorno',
    'giorno_anno',
    'settimana_anno'
).orderBy('anno', 'mese', 'giorno')

total_records = df_calendario_finale.count()
print(f"Record totali generati: {total_records:,}")

df_calendario_finale.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(full_table_name)

print(f"✓ Tabella {full_table_name} creata con successo!")