In [1]:
!pip install pmdarima
!pip install statsmodels

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [2]:
import os
import pandas as pd
import pmdarima as pm
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import statsmodels.api as sm

In [None]:
# Step 0: Initialize Spark
spark = SparkSession.builder \
    .appName("UnemploymentForecast") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Read the large file (all_filtered_data.txt)
data_path = "all_filtered_data.txt"
df = spark.read.csv(data_path, sep='\t', header=True, inferSchema=True)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/27 23:00:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [4]:
df.columns

['series_id', 'year', 'period', 'value', 'footnote_codes', 'state_name']

In [None]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("unemployment_data")
print("Data loaded. Starting processing.")

# Retrieve distinct state names
state_names = spark.sql("""
    SELECT DISTINCT state_name
    FROM unemployment_data
""").collect()

# Extract state names from rows
state_names = [row['state_name'] for row in state_names]

print(f"Found {len(state_names)} states.")
state_names


In [None]:
full_df = df.select('year', 'period', 'value', 'series_id', 'state_name').toPandas()
grouped = full_df.groupby('state_name')
grouped

                                                                                

In [None]:
all_forecasts = []

for state in state_names:
    state_df = spark.sql(f"""
        SELECT year, period, value, series_id
        FROM unemployment_data
        WHERE state_name = '{state}'
        ORDER BY year ASC, period ASC
    """)
    
    pandas_df = state_df.toPandas()
    
    if pandas_df.empty:
        print(f"{state} data is empty, skipping.")
        continue
    
    pandas_df['period'] = pandas_df['period'].str.replace('M', '')
    pandas_df['month'] = pandas_df['period'].astype(int)
    pandas_df = pandas_df[pandas_df['month'] != 13]
    
    pandas_df['timestamp'] = pd.to_datetime(
        pandas_df['year'].astype(str) + '-' + pandas_df['month'].astype(str) + '-01',
        format='%Y-%m-%d'
    )
    pandas_df = pandas_df.sort_values('timestamp')
    
    pandas_df['value'] = pd.to_numeric(pandas_df['value'], errors='coerce')
    pandas_df = pandas_df.dropna(subset=['value'])
    
    try:
        # Step 3: Fit ARIMA model and forecast the next 12 months
        model = sm.tsa.ARIMA(
            pandas_df['value'],
            order=(1, 1, 1),
            seasonal_order=(1, 1, 1, 12)
        ).fit()
        
        n_periods = 12
        forecast = model.forecast(steps=n_periods)
        
        last_timestamp = pandas_df['timestamp'].max()
        future_dates = pd.date_range(last_timestamp + pd.offsets.MonthBegin(1), periods=n_periods, freq='MS')
        
        series_id = pandas_df['series_id'].iloc[0]
        
        forecast_df = pd.DataFrame({
            'series_id': [series_id] * n_periods,
            'year': future_dates.year,
            'period': ['M' + str(month).zfill(2) for month in future_dates.month],
            'value': forecast,
            'state_name': [state] * n_periods
        })
        
        print(f"{state} forecast completed.")
        save_path = f"result/{state}_predict.csv"
        forecast_df.to_csv(save_path, index=False)
        
        print(f"Result saved to: {save_path}")
        
    except Exception as e:
        print(f"{state} ARIMA modeling failed, skipping. Error: {e}")
