# TSIOT Basic Time Series Analysis

This notebook demonstrates basic time series analysis using TSIOT-generated synthetic data. We'll cover:

1. Connecting to TSIOT API
2. Generating synthetic time series
3. Basic statistical analysis
4. Visualization
5. Quality assessment

## Prerequisites

Make sure you have TSIOT running locally or have access to a TSIOT instance.

In [None]:
# Install required packages
!pip install requests pandas numpy matplotlib seaborn scipy statsmodels

In [None]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.seasonal import seasonal_decompose
import warnings
warnings.filterwarnings('ignore')

# Set style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)

## 1. Connect to TSIOT API

In [None]:
# TSIOT API configuration
TSIOT_BASE_URL = "http://localhost:8080"
API_KEY = "your-api-key-here"  # Replace with your actual API key

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

def check_tsiot_connection():
    """Check if TSIOT API is accessible."""
    try:
        response = requests.get(f"{TSIOT_BASE_URL}/health", timeout=5)
        if response.status_code == 200:
            print("✅ Successfully connected to TSIOT API")
            return True
        else:
            print(f"❌ TSIOT API returned status {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"❌ Failed to connect to TSIOT API: {e}")
        return False

check_tsiot_connection()

In [None]:
def get_available_generators():
    """Get list of available generators from TSIOT."""
    response = requests.get(f"{TSIOT_BASE_URL}/api/v1/generators", headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to get generators: {response.status_code}")
        return None

generators = get_available_generators()
if generators:
    print("Available generators:")
    for gen in generators.get('generators', []):
        print(f"  - {gen['name']}: {gen['description']}")

## 2. Generate Synthetic Time Series

In [None]:
def generate_time_series(generator_type, length, parameters=None):
    """Generate a time series using TSIOT API."""
    data = {
        "type": generator_type,
        "length": length,
        "parameters": parameters or {}
    }
    
    response = requests.post(
        f"{TSIOT_BASE_URL}/api/v1/generate", 
        json=data, 
        headers=headers
    )
    
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Generation failed: {response.status_code}")
        print(response.text)
        return None

# Generate different types of time series
print("Generating synthetic time series...")

# 1. ARIMA series
arima_data = generate_time_series(
    "arima", 
    500, 
    {
        "ar_params": [0.5, -0.3],
        "ma_params": [0.2],
        "mean": 100,
        "variance": 25
    }
)

# 2. LSTM series
lstm_data = generate_time_series(
    "lstm",
    500,
    {
        "trend": 0.1,
        "seasonality": 24,
        "noise": 0.05
    }
)

# 3. Random walk
rw_data = generate_time_series(
    "random_walk",
    500,
    {
        "drift": 0.02,
        "volatility": 1.0
    }
)

print("✅ Generated 3 time series successfully!")

In [None]:
# Convert to pandas DataFrames
def to_dataframe(tsiot_response, name):
    """Convert TSIOT response to pandas DataFrame."""
    if not tsiot_response:
        return None
    
    values = tsiot_response.get('values', [])
    timestamps = tsiot_response.get('timestamps', [])
    
    if not timestamps:
        # Create synthetic timestamps if not provided
        timestamps = pd.date_range(start='2023-01-01', periods=len(values), freq='H')
    else:
        timestamps = pd.to_datetime(timestamps)
    
    return pd.DataFrame({
        'timestamp': timestamps,
        'value': values,
        'series': name
    }).set_index('timestamp')

# Create DataFrames
df_arima = to_dataframe(arima_data, 'ARIMA')
df_lstm = to_dataframe(lstm_data, 'LSTM')
df_rw = to_dataframe(rw_data, 'Random Walk')

# Combine all series
df_combined = pd.concat([df_arima, df_lstm, df_rw])

print(f"DataFrame shape: {df_combined.shape}")
print(f"Date range: {df_combined.index.min()} to {df_combined.index.max()}")

## 3. Basic Statistical Analysis

In [None]:
# Calculate basic statistics for each series
def calculate_basic_stats(df):
    """Calculate basic statistics for time series."""
    stats_dict = {
        'count': len(df),
        'mean': df['value'].mean(),
        'std': df['value'].std(),
        'min': df['value'].min(),
        'max': df['value'].max(),
        'median': df['value'].median(),
        'skewness': df['value'].skew(),
        'kurtosis': df['value'].kurtosis()
    }
    return stats_dict

# Calculate stats for each series
series_stats = {}
for series_name in ['ARIMA', 'LSTM', 'Random Walk']:
    series_df = df_combined[df_combined['series'] == series_name]
    series_stats[series_name] = calculate_basic_stats(series_df)

# Create summary table
stats_df = pd.DataFrame(series_stats).T
print("📊 Basic Statistics Summary:")
print(stats_df.round(4))

In [None]:
# Statistical tests
def perform_statistical_tests(series, name):
    """Perform various statistical tests on time series."""
    results = {}
    
    # Augmented Dickey-Fuller test for stationarity
    adf_result = adfuller(series.dropna())
    results['adf_statistic'] = adf_result[0]
    results['adf_pvalue'] = adf_result[1]
    results['is_stationary'] = adf_result[1] < 0.05
    
    # Jarque-Bera test for normality
    jb_stat, jb_pvalue = stats.jarque_bera(series.dropna())
    results['jb_statistic'] = jb_stat
    results['jb_pvalue'] = jb_pvalue
    results['is_normal'] = jb_pvalue > 0.05
    
    # Ljung-Box test for autocorrelation
    from statsmodels.stats.diagnostic import acorr_ljungbox
    lb_result = acorr_ljungbox(series.dropna(), lags=10, return_df=True)
    results['lb_pvalue_min'] = lb_result['lb_pvalue'].min()
    results['has_autocorr'] = lb_result['lb_pvalue'].min() < 0.05
    
    return results

# Perform tests for each series
test_results = {}
for series_name in ['ARIMA', 'LSTM', 'Random Walk']:
    series_df = df_combined[df_combined['series'] == series_name]
    test_results[series_name] = perform_statistical_tests(series_df['value'], series_name)

# Create results table
tests_df = pd.DataFrame(test_results).T
print("🧪 Statistical Tests Results:")
print(tests_df)

## 4. Visualization

In [None]:
# Plot all time series
fig, axes = plt.subplots(3, 1, figsize=(15, 12))

series_names = ['ARIMA', 'LSTM', 'Random Walk']
colors = ['blue', 'green', 'red']

for i, (series_name, color) in enumerate(zip(series_names, colors)):
    series_df = df_combined[df_combined['series'] == series_name]
    axes[i].plot(series_df.index, series_df['value'], color=color, alpha=0.8, linewidth=1)
    axes[i].set_title(f'{series_name} Time Series', fontsize=14, fontweight='bold')
    axes[i].set_ylabel('Value')
    axes[i].grid(True, alpha=0.3)
    
    # Add statistics as text
    stats_text = f"Mean: {series_stats[series_name]['mean']:.2f}, Std: {series_stats[series_name]['std']:.2f}"
    axes[i].text(0.02, 0.95, stats_text, transform=axes[i].transAxes, 
                bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

axes[-1].set_xlabel('Time')
plt.tight_layout()
plt.show()

In [None]:
# Distribution plots
fig, axes = plt.subplots(2, 3, figsize=(18, 10))

for i, series_name in enumerate(series_names):
    series_df = df_combined[df_combined['series'] == series_name]
    values = series_df['value']
    
    # Histogram
    axes[0, i].hist(values, bins=30, alpha=0.7, color=colors[i], edgecolor='black')
    axes[0, i].set_title(f'{series_name} - Distribution')
    axes[0, i].set_ylabel('Frequency')
    axes[0, i].grid(True, alpha=0.3)
    
    # Q-Q plot
    stats.probplot(values, dist="norm", plot=axes[1, i])
    axes[1, i].set_title(f'{series_name} - Q-Q Plot')
    axes[1, i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Autocorrelation and Partial Autocorrelation plots
fig, axes = plt.subplots(3, 2, figsize=(15, 12))

for i, series_name in enumerate(series_names):
    series_df = df_combined[df_combined['series'] == series_name]
    values = series_df['value'].dropna()
    
    # ACF
    acf_values = acf(values, nlags=40)
    axes[i, 0].plot(acf_values, marker='o', markersize=3)
    axes[i, 0].axhline(y=0, color='black', linestyle='-', alpha=0.3)
    axes[i, 0].axhline(y=1.96/np.sqrt(len(values)), color='red', linestyle='--', alpha=0.5)
    axes[i, 0].axhline(y=-1.96/np.sqrt(len(values)), color='red', linestyle='--', alpha=0.5)
    axes[i, 0].set_title(f'{series_name} - Autocorrelation')
    axes[i, 0].set_ylabel('ACF')
    axes[i, 0].grid(True, alpha=0.3)
    
    # PACF
    pacf_values = pacf(values, nlags=40)
    axes[i, 1].plot(pacf_values, marker='o', markersize=3)
    axes[i, 1].axhline(y=0, color='black', linestyle='-', alpha=0.3)
    axes[i, 1].axhline(y=1.96/np.sqrt(len(values)), color='red', linestyle='--', alpha=0.5)
    axes[i, 1].axhline(y=-1.96/np.sqrt(len(values)), color='red', linestyle='--', alpha=0.5)
    axes[i, 1].set_title(f'{series_name} - Partial Autocorrelation')
    axes[i, 1].set_ylabel('PACF')
    axes[i, 1].grid(True, alpha=0.3)

axes[-1, 0].set_xlabel('Lag')
axes[-1, 1].set_xlabel('Lag')
plt.tight_layout()
plt.show()

## 5. Seasonal Decomposition

In [None]:
# Perform seasonal decomposition for LSTM series (which has seasonality)
lstm_series = df_combined[df_combined['series'] == 'LSTM']['value']

# Ensure we have enough data points for seasonal decomposition
if len(lstm_series) >= 48:  # At least 2 cycles of 24-hour seasonality
    decomposition = seasonal_decompose(lstm_series, model='additive', period=24)
    
    fig, axes = plt.subplots(4, 1, figsize=(15, 12))
    
    decomposition.observed.plot(ax=axes[0], title='Original Series')
    decomposition.trend.plot(ax=axes[1], title='Trend Component')
    decomposition.seasonal.plot(ax=axes[2], title='Seasonal Component')
    decomposition.resid.plot(ax=axes[3], title='Residual Component')
    
    for ax in axes:
        ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Calculate component statistics
    print("📊 Seasonal Decomposition Statistics:")
    print(f"Trend variance: {decomposition.trend.var():.4f}")
    print(f"Seasonal variance: {decomposition.seasonal.var():.4f}")
    print(f"Residual variance: {decomposition.resid.var():.4f}")
    
    # Calculate signal-to-noise ratio
    signal_var = decomposition.trend.var() + decomposition.seasonal.var()
    noise_var = decomposition.resid.var()
    snr = signal_var / noise_var
    print(f"Signal-to-Noise Ratio: {snr:.4f}")
else:
    print("⚠️ Not enough data points for seasonal decomposition")

## 6. Quality Assessment

In [None]:
def assess_time_series_quality(series, name):
    """Assess the quality of a time series."""
    quality_metrics = {}
    
    # Missing values
    quality_metrics['missing_count'] = series.isna().sum()
    quality_metrics['missing_percentage'] = (series.isna().sum() / len(series)) * 100
    
    # Outliers (using IQR method)
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = ((series < lower_bound) | (series > upper_bound)).sum()
    quality_metrics['outlier_count'] = outliers
    quality_metrics['outlier_percentage'] = (outliers / len(series)) * 100
    
    # Data consistency
    quality_metrics['coefficient_of_variation'] = series.std() / series.mean()
    
    # Temporal consistency (assuming regular intervals)
    if hasattr(series, 'index') and hasattr(series.index, 'to_series'):
        time_diffs = series.index.to_series().diff().dropna()
        quality_metrics['irregular_intervals'] = (time_diffs != time_diffs.mode()[0]).sum()
    else:
        quality_metrics['irregular_intervals'] = 0
    
    # Overall quality score (simple heuristic)
    quality_score = 100
    quality_score -= quality_metrics['missing_percentage'] * 2  # Penalize missing values
    quality_score -= quality_metrics['outlier_percentage']      # Penalize outliers
    quality_score = max(0, quality_score)  # Ensure non-negative
    quality_metrics['overall_quality_score'] = quality_score
    
    return quality_metrics

# Assess quality for each series
quality_results = {}
for series_name in series_names:
    series_df = df_combined[df_combined['series'] == series_name]
    quality_results[series_name] = assess_time_series_quality(series_df['value'], series_name)

# Create quality summary table
quality_df = pd.DataFrame(quality_results).T
print("🎯 Quality Assessment Results:")
print(quality_df.round(2))

In [None]:
# Quality comparison visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Quality scores
quality_scores = [quality_results[name]['overall_quality_score'] for name in series_names]
axes[0, 0].bar(series_names, quality_scores, color=colors)
axes[0, 0].set_title('Overall Quality Scores')
axes[0, 0].set_ylabel('Quality Score')
axes[0, 0].set_ylim(0, 100)

# Missing value percentages
missing_pcts = [quality_results[name]['missing_percentage'] for name in series_names]
axes[0, 1].bar(series_names, missing_pcts, color=colors)
axes[0, 1].set_title('Missing Values (%)')
axes[0, 1].set_ylabel('Missing %')

# Outlier percentages
outlier_pcts = [quality_results[name]['outlier_percentage'] for name in series_names]
axes[1, 0].bar(series_names, outlier_pcts, color=colors)
axes[1, 0].set_title('Outliers (%)')
axes[1, 0].set_ylabel('Outlier %')

# Coefficient of variation
cv_values = [quality_results[name]['coefficient_of_variation'] for name in series_names]
axes[1, 1].bar(series_names, cv_values, color=colors)
axes[1, 1].set_title('Coefficient of Variation')
axes[1, 1].set_ylabel('CV')

for ax in axes.flat:
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 7. Summary Report

In [None]:
# Generate comprehensive summary report
print("\n" + "="*80)
print("📋 TSIOT TIME SERIES ANALYSIS SUMMARY REPORT")
print("="*80)

print(f"\n📅 Analysis Date: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📊 Number of Series Analyzed: {len(series_names)}")
print(f"📏 Data Points per Series: {len(df_arima)}")
print(f"⏱️ Time Range: {df_combined.index.min().strftime('%Y-%m-%d')} to {df_combined.index.max().strftime('%Y-%m-%d')}")

print("\n" + "-"*50)
print("📈 SERIES CHARACTERISTICS")
print("-"*50)

for series_name in series_names:
    stats = series_stats[series_name]
    tests = test_results[series_name]
    quality = quality_results[series_name]
    
    print(f"\n🔸 {series_name} Series:")
    print(f"   Mean: {stats['mean']:.2f} ± {stats['std']:.2f}")
    print(f"   Range: [{stats['min']:.2f}, {stats['max']:.2f}]")
    print(f"   Stationarity: {'✅ Stationary' if tests['is_stationary'] else '❌ Non-stationary'}")
    print(f"   Normality: {'✅ Normal' if tests['is_normal'] else '❌ Non-normal'}")
    print(f"   Autocorrelation: {'✅ Present' if tests['has_autocorr'] else '❌ Absent'}")
    print(f"   Quality Score: {quality['overall_quality_score']:.1f}/100")
    
    if quality['outlier_count'] > 0:
        print(f"   ⚠️ Outliers detected: {quality['outlier_count']} ({quality['outlier_percentage']:.1f}%)")

print("\n" + "-"*50)
print("🎯 RECOMMENDATIONS")
print("-"*50)

for series_name in series_names:
    tests = test_results[series_name]
    quality = quality_results[series_name]
    
    print(f"\n🔸 {series_name} Series:")
    
    if not tests['is_stationary']:
        print("   📌 Apply differencing or detrending for stationarity")
    
    if not tests['is_normal']:
        print("   📌 Consider transformation (log, box-cox) for normality")
    
    if tests['has_autocorr']:
        print("   📌 Suitable for ARIMA/LSTM modeling")
    
    if quality['outlier_percentage'] > 5:
        print("   📌 Investigate and possibly remove outliers")
    
    if quality['overall_quality_score'] > 90:
        print("   ✅ High quality - ready for analysis")
    elif quality['overall_quality_score'] > 70:
        print("   ⚠️ Good quality - minor preprocessing needed")
    else:
        print("   ❌ Poor quality - significant preprocessing required")

print("\n" + "="*80)
print("✅ ANALYSIS COMPLETE")
print("="*80)

## 8. Export Results

In [None]:
# Export analysis results
import os
from datetime import datetime

# Create output directory
output_dir = "analysis_results"
os.makedirs(output_dir, exist_ok=True)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Export data
df_combined.to_csv(f"{output_dir}/time_series_data_{timestamp}.csv")
stats_df.to_csv(f"{output_dir}/basic_statistics_{timestamp}.csv")
tests_df.to_csv(f"{output_dir}/statistical_tests_{timestamp}.csv")
quality_df.to_csv(f"{output_dir}/quality_assessment_{timestamp}.csv")

print(f"✅ Results exported to {output_dir}/ directory")
print(f"📁 Files created:")
print(f"   - time_series_data_{timestamp}.csv")
print(f"   - basic_statistics_{timestamp}.csv")
print(f"   - statistical_tests_{timestamp}.csv")
print(f"   - quality_assessment_{timestamp}.csv")

## Next Steps

This notebook provided a comprehensive basic analysis of TSIOT-generated synthetic time series. Here are some suggested next steps:

1. **Advanced Analysis**: Explore advanced notebooks for specific use cases:
   - `anomaly_detection.ipynb` - Detect anomalies in time series
   - `forecasting_models.ipynb` - Build forecasting models
   - `feature_engineering.ipynb` - Extract meaningful features

2. **Model Development**: Use the synthetic data to train and validate time series models

3. **Quality Improvement**: If quality scores are low, consider:
   - Adjusting generator parameters
   - Applying preprocessing techniques
   - Using different generator types

4. **Integration**: Integrate TSIOT into your data pipeline for continuous synthetic data generation

5. **Validation**: Compare synthetic data characteristics with real-world data to ensure realism

For more information, visit the [TSIOT documentation](../../docs/) or explore other example notebooks in this directory.