IMPORT HDFS FILE

In [1]:
import pandas as pd
from hdfs import InsecureClient

# Initialize HDFS client
hdfs_client = InsecureClient("http://localhost:9870", user="root")

=== Importing Data from HDFS ===

1. Delivery Data:
Loading Delivery...
  Found 5 CSV file(s): ['delivery_cq.csv', 'delivery_hz.csv', 'delivery_jl.csv', 'delivery_sh.csv', 'delivery_yt.csv']
    Reading delivery_cq.csv... ✓ (931351 rows)
    Reading delivery_hz.csv... ✓ (931351 rows)
    Reading delivery_hz.csv... ✓ (1861600 rows)
    Reading delivery_jl.csv... ✓ (31415 rows)
    Reading delivery_sh.csv... ✓ (1861600 rows)
    Reading delivery_jl.csv... ✓ (31415 rows)
    Reading delivery_sh.csv... ✓ (1483864 rows)
    Reading delivery_yt.csv... ✓ (1483864 rows)
    Reading delivery_yt.csv... ✓ (206431 rows)
✓ (206431 rows)
  ✓ Combined: 4514661 total rows, 17 columns

2. PickUp Data:
Loading PickUp...
  Found 5 CSV file(s): ['pickup_cq.csv', 'pickup_hz.csv', 'pickup_jl.csv', 'pickup_sh.csv', 'pickup_yt.csv']
    Reading pickup_cq.csv...   ✓ Combined: 4514661 total rows, 17 columns

2. PickUp Data:
Loading PickUp...
  Found 5 CSV file(s): ['pickup_cq.csv', 'pickup_hz.csv', 'pickup_jl.c

QUICK PREVIEW DATA

In [2]:
import pandas as pd

print("=== Quick Preview: First 5 Rows All Columns (from HDFS) ===\n")

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 30)

# 1. Delivery - Just show first 5 rows of loaded data
print("1. DELIVERY DATA - First 5 rows, ALL columns:")
print(f"   Total Shape: {df_delivery.shape}")
print(f"   Columns: {list(df_delivery.columns)}\n")
print(df_delivery.head(5).to_string())

# 2. Pickup - Just show first 5 rows of loaded data
print("\n" + "="*100)
print("2. PICKUP DATA - First 5 rows, ALL columns:")
print(f"   Total Shape: {df_pickup.shape}")
print(f"   Columns: {list(df_pickup.columns)}\n")
print(df_pickup.head(5).to_string())

# 3. Roadmap - Just show first 5 rows of loaded data
print("\n" + "="*100)
print("3. ROADMAP DATA - First 5 rows, ALL columns:")
print(f"   Total Shape: {df_roadmap.shape}")
print(f"   Columns: {list(df_roadmap.columns)}\n")
print(df_roadmap.head(5).to_string())

=== Quick Preview: First 5 Rows All Columns (from HDFS) ===

1. DELIVERY DATA - First 5 rows, ALL columns:
   Total Shape: (4514661, 17)
   Columns: ['order_id', 'region_id', 'city', 'courier_id', 'lng', 'lat', 'aoi_id', 'aoi_type', 'accept_time', 'accept_gps_time', 'accept_gps_lng', 'accept_gps_lat', 'delivery_time', 'delivery_gps_time', 'delivery_gps_lng', 'delivery_gps_lat', 'ds']

   order_id  region_id       city  courier_id        lng       lat  aoi_id  aoi_type     accept_time accept_gps_time  accept_gps_lng  accept_gps_lat   delivery_time delivery_gps_time  delivery_gps_lng  delivery_gps_lat    ds
0   2031782         10  Chongqing          73  108.71571  30.90228      50        14  10-22 10:26:00  10-22 10:26:00       108.71826        30.95587  10-22 17:04:00    10-22 17:04:00         108.66361          30.96702  1022
1   4285071         10  Chongqing        3605  108.71639  30.90269      50        14  09-07 10:13:00  09-07 10:13:00       108.71791        30.95635  09-09 15:44:

STG

In [8]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

print("=== STG (SEASONAL TIME SERIES) FORECASTING ===\n")

# 1. SAMPLE DATA FOR FASTER PROCESSING
print("1. Sampling Data for STG Analysis...")

# Take 1/10 of delivery data for speed
sample_size = max(100000, len(df_delivery) // 10)
df_sample = df_delivery.sample(n=sample_size, random_state=42)

print(f"   Sampled {len(df_sample):,} records from {len(df_delivery):,} total")

# Parse delivery times
def parse_delivery_time(x):
    if pd.isna(x):
        return None
    try:
        # Handle MM-DD HH:MM:SS format (add year 2024 as default)
        return pd.to_datetime(f"2024-{x}", format='%Y-%m-%d %H:%M:%S', errors='coerce')
    except:
        return None

df_sample['delivery_datetime'] = df_sample['delivery_time'].apply(parse_delivery_time)

# Check if we have any valid dates
valid_count = df_sample['delivery_datetime'].notna().sum()
print(f"   Successfully parsed {valid_count} datetime values")

df_valid = df_sample.dropna(subset=['delivery_datetime']).copy()

print(f"   Valid records with delivery times: {len(df_valid):,}")
if len(df_valid) > 0:
    print(f"   Date range: {df_valid['delivery_datetime'].min()} to {df_valid['delivery_datetime'].max()}\n")
else:
    print("   No valid delivery times found\n")

# 2. AGGREGATE BY CITY AND DATE
print("2. Aggregating Delivery Volume by City and Date...")

# Extract date only (keep as datetime for .dt accessor)
df_valid['delivery_date'] = df_valid['delivery_datetime'].dt.normalize()

# Count deliveries per city per day
daily_stats = df_valid.groupby(['city', 'delivery_date']).size().reset_index(name='delivery_count')

print(f"   Daily aggregated records: {len(daily_stats):,}\n")

# 3. TIME SERIES DECOMPOSITION BY CITY
print("3. Time Series Decomposition by City...")

cities = df_valid['city'].unique()
decomposition_results = []

for city in cities:
    city_data = daily_stats[daily_stats['city'] == city].sort_values('delivery_date').copy()
    
    if len(city_data) > 0:
        # Calculate metrics
        total_deliveries = city_data['delivery_count'].sum()
        avg_daily = city_data['delivery_count'].mean()
        max_daily = city_data['delivery_count'].max()
        min_daily = city_data['delivery_count'].min()
        std_daily = city_data['delivery_count'].std()
        
        # Calculate trend (7-day moving average)
        if len(city_data) >= 7:
            trend = city_data['delivery_count'].rolling(window=7, min_periods=1).mean().iloc[-1]
        else:
            trend = avg_daily
        
        decomposition_results.append({
            'city': city,
            'total_deliveries': int(total_deliveries),
            'avg_daily': round(avg_daily, 1),
            'max_daily': int(max_daily),
            'min_daily': int(min_daily),
            'std_dev': round(std_daily, 1),
            'trend': round(trend, 1),
            'days_covered': len(city_data)
        })

df_decomp = pd.DataFrame(decomposition_results)
print(f"   ✓ Decomposed {len(df_decomp)} cities\n")

# 4. SEASONAL FORECASTING
print("4. Seasonal Forecasting for Next 7 Days...")

forecast_results = []

for city in cities:
    city_data = daily_stats[daily_stats['city'] == city].sort_values('delivery_date').copy()
    
    if len(city_data) >= 7:
        # Calculate day-of-week seasonality
        city_data['date_obj'] = pd.to_datetime(city_data['delivery_date'])
        city_data['day_of_week'] = city_data['date_obj'].dt.dayofweek
        
        dow_avg = city_data.groupby('day_of_week')['delivery_count'].mean()
        overall_avg = city_data['delivery_count'].mean()
        
        # Calculate seasonal factors
        seasonal_factors = (dow_avg / overall_avg).to_dict()
        
        # Generate 7-day forecast
        if len(city_data) > 0:
            start_date = city_data['delivery_date'].max() + timedelta(days=1)
            
            for day_offset in range(7):
                forecast_date = start_date + timedelta(days=day_offset)
                dow = forecast_date.weekday()
                
                seasonal_factor = seasonal_factors.get(dow, 1.0)
                forecast_value = overall_avg * seasonal_factor
                forecast_value = forecast_value * np.random.uniform(0.95, 1.05)
                
                forecast_results.append({
                    'city': city,
                    'forecast_date': forecast_date,
                    'day_of_week': ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][dow],
                    'seasonal_factor': round(seasonal_factor, 2),
                    'forecast_deliveries': int(forecast_value)
                })

df_forecast = pd.DataFrame(forecast_results)
print(f"   ✓ Generated {len(df_forecast)} forecast records\n")

# 5. DISPLAY DECOMPOSITION
print("5. Time Series Decomposition Summary:")
print(df_decomp.to_string(index=False))

# 6. DISPLAY FORECAST SAMPLE
print("\n6. 7-Day Forecast Sample:")
if len(df_forecast) > 0:
    print("   First 15 forecast records:")
    print(df_forecast.head(15).to_string(index=False))

# 7. AGGREGATE FORECAST BY CITY
print("\n7. Aggregated 7-Day Forecast by City:")
if len(df_forecast) > 0:
    forecast_summary = df_forecast.groupby('city').agg({
        'forecast_deliveries': ['sum', 'mean', 'min', 'max']
    }).round(1)
    forecast_summary.columns = ['Total_7D', 'Avg_Daily', 'Min_Daily', 'Max_Daily']
    print(forecast_summary)

# 8. TREND ANALYSIS
print("\n8. Trend Analysis:")
for idx, row in df_decomp.iterrows():
    if row['min_daily'] > 0:
        growth = (row['trend'] - row['min_daily']) / row['min_daily'] * 100
    else:
        growth = 0
    status = "↑ Growing" if growth > 10 else "→ Stable" if growth < 5 else "↓ Declining"
    print(f"   {row['city']:15} Trend: {row['trend']:7.1f} avg/day {status}")

# 9. SEASONALITY INSIGHTS
print("\n9. Seasonality Insights (Day-of-Week Patterns):")
for city in sorted(cities):
    city_data = daily_stats[daily_stats['city'] == city].copy()
    city_data['date_obj'] = pd.to_datetime(city_data['delivery_date'])
    city_data['day_of_week'] = city_data['date_obj'].dt.day_name()
    dow_pattern = city_data.groupby('day_of_week')['delivery_count'].mean()
    
    if len(dow_pattern) > 0:
        peak_day = dow_pattern.idxmax()
        peak_value = dow_pattern.max()
        low_day = dow_pattern.idxmin()
        low_value = dow_pattern.min()
        
        print(f"\n   {city}:")
        print(f"      Peak: {peak_day} ({peak_value:.0f} deliveries)")
        print(f"      Low: {low_day} ({low_value:.0f} deliveries)")
        if low_value > 0:
            print(f"      Variation: {((peak_value - low_value) / low_value * 100):.1f}%")

print("\n✓ STG FORECASTING COMPLETE!")

=== STG (SEASONAL TIME SERIES) FORECASTING ===

1. Sampling Data for STG Analysis...
   Sampled 451,466 records from 4,514,661 total
   Sampled 451,466 records from 4,514,661 total
   Successfully parsed 451466 datetime values
   Valid records with delivery times: 451,466
   Date range: 2024-05-01 07:55:00 to 2024-11-16 17:53:00

2. Aggregating Delivery Volume by City and Date...
   Successfully parsed 451466 datetime values
   Valid records with delivery times: 451,466
   Date range: 2024-05-01 07:55:00 to 2024-11-16 17:53:00

2. Aggregating Delivery Volume by City and Date...
   Daily aggregated records: 915

3. Time Series Decomposition by City...
   ✓ Decomposed 5 cities

4. Seasonal Forecasting for Next 7 Days...
   ✓ Generated 35 forecast records

5. Time Series Decomposition Summary:
     city  total_deliveries  avg_daily  max_daily  min_daily  std_dev  trend  days_covered
Chongqing             93003      479.4       1381          1    250.5    1.9           194
    Jilin       

VISUALIZE

In [9]:
import matplotlib.pyplot as plt
import seaborn as sns
import os

print("=== VISUALIZATION & EXPORT ===\n")

# Create output directory
output_dir = "/home/sirin/BIGDATA/Optimize-Delivery/optimize/result"
os.makedirs(output_dir, exist_ok=True)
print(f"✓ Output directory: {output_dir}\n")

# 1. EXPORT DATAFRAMES TO CSV
print("1. Exporting Data to CSV...")

decomp_path = f"{output_dir}/decomposition_summary.csv"
df_decomp.to_csv(decomp_path, index=False)
print(f"   ✓ Decomposition: {decomp_path}")

forecast_path = f"{output_dir}/forecast_7days.csv"
df_forecast.to_csv(forecast_path, index=False)
print(f"   ✓ Forecast: {forecast_path}")

daily_stats_path = f"{output_dir}/daily_stats.csv"
daily_stats.to_csv(daily_stats_path, index=False)
print(f"   ✓ Daily Stats: {daily_stats_path}\n")

# 2. VISUALIZATION - DECOMPOSITION BY CITY
print("2. Creating Visualizations...")

fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('STG Forecasting Analysis - Decomposition Summary', fontsize=16, fontweight='bold')

# Total Deliveries by City
ax1 = axes[0, 0]
df_decomp_sorted = df_decomp.sort_values('total_deliveries', ascending=False)
ax1.barh(df_decomp_sorted['city'], df_decomp_sorted['total_deliveries'], color='steelblue')
ax1.set_xlabel('Total Deliveries')
ax1.set_title('Total Deliveries by City')
ax1.grid(axis='x', alpha=0.3)

# Average Daily Deliveries
ax2 = axes[0, 1]
ax2.barh(df_decomp_sorted['city'], df_decomp_sorted['avg_daily'], color='forestgreen')
ax2.set_xlabel('Average Daily Deliveries')
ax2.set_title('Average Daily Deliveries by City')
ax2.grid(axis='x', alpha=0.3)

# Trend Analysis
ax3 = axes[1, 0]
ax3.barh(df_decomp_sorted['city'], df_decomp_sorted['trend'], color='darkorange')
ax3.set_xlabel('7-Day Trend')
ax3.set_title('Delivery Trend (7-Day Moving Average)')
ax3.grid(axis='x', alpha=0.3)

# Standard Deviation
ax4 = axes[1, 1]
ax4.barh(df_decomp_sorted['city'], df_decomp_sorted['std_dev'], color='crimson')
ax4.set_xlabel('Standard Deviation')
ax4.set_title('Delivery Variability by City')
ax4.grid(axis='x', alpha=0.3)

plt.tight_layout()
decomp_viz_path = f"{output_dir}/decomposition_analysis.png"
plt.savefig(decomp_viz_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"   ✓ Decomposition visualization: {decomp_viz_path}")

# 3. VISUALIZATION - 7-DAY FORECAST BY CITY
fig, ax = plt.subplots(figsize=(14, 8))

# Prepare forecast data for stacked visualization
forecast_by_city_date = df_forecast.pivot_table(
    values='forecast_deliveries', 
    index='forecast_date', 
    columns='city', 
    aggfunc='sum'
)

forecast_by_city_date.plot(kind='line', ax=ax, marker='o', linewidth=2)
ax.set_xlabel('Forecast Date', fontsize=12)
ax.set_ylabel('Forecast Deliveries', fontsize=12)
ax.set_title('7-Day Delivery Forecast by City', fontsize=14, fontweight='bold')
ax.legend(title='City', bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()

forecast_viz_path = f"{output_dir}/forecast_7days_trend.png"
plt.savefig(forecast_viz_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"   ✓ Forecast trend visualization: {forecast_viz_path}")

# 4. VISUALIZATION - SEASONALITY HEATMAP
fig, ax = plt.subplots(figsize=(12, 6))

# Create day-of-week seasonality matrix
seasonality_matrix = []
city_labels = []

for city in sorted(cities):
    city_data = daily_stats[daily_stats['city'] == city].copy()
    city_data['date_obj'] = pd.to_datetime(city_data['delivery_date'])
    city_data['day_of_week'] = city_data['date_obj'].dt.dayofweek
    dow_pattern = city_data.groupby('day_of_week')['delivery_count'].mean()
    
    # Fill missing days with 0
    dow_values = [dow_pattern.get(i, 0) for i in range(7)]
    seasonality_matrix.append(dow_values)
    city_labels.append(city)

sns.heatmap(
    seasonality_matrix, 
    annot=True, 
    fmt='.0f', 
    cmap='YlOrRd', 
    xticklabels=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'],
    yticklabels=city_labels,
    ax=ax,
    cbar_kws={'label': 'Avg Deliveries'}
)
ax.set_title('Day-of-Week Seasonality Pattern', fontsize=14, fontweight='bold')
ax.set_xlabel('Day of Week', fontsize=12)
ax.set_ylabel('City', fontsize=12)
plt.tight_layout()

seasonality_path = f"{output_dir}/seasonality_heatmap.png"
plt.savefig(seasonality_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"   ✓ Seasonality heatmap: {seasonality_path}")

# 5. EXPORT SUMMARY REPORT
print("\n3. Creating Summary Report...")

report_path = f"{output_dir}/forecasting_report.txt"
with open(report_path, 'w') as f:
    f.write("="*80 + "\n")
    f.write("STG (SEASONAL TIME SERIES) FORECASTING REPORT\n")
    f.write("="*80 + "\n\n")
    
    f.write("EXECUTIVE SUMMARY\n")
    f.write("-"*80 + "\n")
    f.write(f"Analysis Date: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
    f.write(f"Total Cities Analyzed: {len(df_decomp)}\n")
    f.write(f"Total Deliveries Analyzed: {df_decomp['total_deliveries'].sum():,}\n")
    f.write(f"Analysis Period: {daily_stats['delivery_date'].min()} to {daily_stats['delivery_date'].max()}\n")
    f.write(f"Forecast Period: 7 days ahead\n\n")
    
    f.write("DECOMPOSITION ANALYSIS\n")
    f.write("-"*80 + "\n")
    f.write(df_decomp.to_string(index=False))
    f.write("\n\n")
    
    f.write("FORECAST SUMMARY (NEXT 7 DAYS)\n")
    f.write("-"*80 + "\n")
    forecast_summary = df_forecast.groupby('city').agg({
        'forecast_deliveries': ['sum', 'mean', 'min', 'max']
    }).round(1)
    forecast_summary.columns = ['Total_7D', 'Avg_Daily', 'Min_Daily', 'Max_Daily']
    f.write(forecast_summary.to_string())
    f.write("\n\n")
    
    f.write("TREND ANALYSIS\n")
    f.write("-"*80 + "\n")
    for idx, row in df_decomp.iterrows():
        if row['min_daily'] > 0:
            growth = (row['trend'] - row['min_daily']) / row['min_daily'] * 100
        else:
            growth = 0
        status = "↑ GROWING" if growth > 10 else "→ STABLE" if growth < 5 else "↓ DECLINING"
        f.write(f"{row['city']:15} | Trend: {row['trend']:7.1f} | Growth: {growth:6.1f}% | {status}\n")
    f.write("\n")
    
    f.write("SEASONALITY INSIGHTS\n")
    f.write("-"*80 + "\n")
    for city in sorted(cities):
        city_data = daily_stats[daily_stats['city'] == city].copy()
        city_data['date_obj'] = pd.to_datetime(city_data['delivery_date'])
        city_data['day_of_week'] = city_data['date_obj'].dt.day_name()
        dow_pattern = city_data.groupby('day_of_week')['delivery_count'].mean()
        
        if len(dow_pattern) > 0:
            peak_day = dow_pattern.idxmax()
            peak_value = dow_pattern.max()
            low_day = dow_pattern.idxmin()
            low_value = dow_pattern.min()
            variation = ((peak_value - low_value) / low_value * 100) if low_value > 0 else 0
            
            f.write(f"\n{city}:\n")
            f.write(f"  Peak: {peak_day:12} ({peak_value:7.0f} deliveries)\n")
            f.write(f"  Low:  {low_day:12} ({low_value:7.0f} deliveries)\n")
            f.write(f"  Variation: {variation:.1f}%\n")

print(f"   ✓ Summary report: {report_path}\n")

# 6. EXPORT FORECAST DETAILED
print("4. Exporting Detailed Forecast...")

forecast_detailed_path = f"{output_dir}/forecast_detailed.csv"
df_forecast_detailed = df_forecast.copy()
df_forecast_detailed['forecast_date'] = df_forecast_detailed['forecast_date'].astype(str)
df_forecast_detailed.to_csv(forecast_detailed_path, index=False)
print(f"   ✓ Detailed forecast: {forecast_detailed_path}\n")

print("="*80)
print("✓ VISUALIZATION & EXPORT COMPLETE!")
print("="*80)
print(f"\nFiles exported to: {output_dir}")
print("\nGenerated files:")
print("  1. decomposition_summary.csv - City-level metrics")
print("  2. forecast_7days.csv - 7-day forecast data")
print("  3. daily_stats.csv - Daily aggregated statistics")
print("  4. decomposition_analysis.png - 4-panel decomposition charts")
print("  5. forecast_7days_trend.png - Forecast trend line chart")
print("  6. seasonality_heatmap.png - Day-of-week patterns")
print("  7. forecast_detailed.csv - Detailed forecast records")
print("  8. forecasting_report.txt - Comprehensive text report")

=== VISUALIZATION & EXPORT ===

✓ Output directory: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result

1. Exporting Data to CSV...
   ✓ Decomposition: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/decomposition_summary.csv
   ✓ Forecast: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/forecast_7days.csv
   ✓ Daily Stats: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/daily_stats.csv

2. Creating Visualizations...
   ✓ Decomposition visualization: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/decomposition_analysis.png
   ✓ Decomposition visualization: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/decomposition_analysis.png
   ✓ Forecast trend visualization: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/forecast_7days_trend.png
   ✓ Forecast trend visualization: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/forecast_7days_trend.png
   ✓ Seasonality heatmap: /home/sirin/BIGDATA/Optimize-Delivery/optimize/result/seasonality_heatmap.