In [1]:
import dask.dataframe as dd

In [2]:
# Загрузка данных (в качестве примера взята часть данных за 2008 год)
df = dd.read_csv('2008/2008.csv')

In [3]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 29 entries, Year to LateAircraftDelay
dtypes: object(4), float64(6), int64(19)

In [3]:
dtypes = {
    'ActualElapsedTime': 'float64',
    'AirTime': 'float64',
    'ArrDelay': 'float64',
    'ArrTime': 'float64',
    'CRSElapsedTime': 'float64',
    'CancellationCode': 'object',
    'DepDelay': 'float64',
    'DepTime': 'float64',
    'TaxiIn': 'float64',
    'TaxiOut': 'float64'
}

In [4]:
df = dd.read_csv('2008/2008.csv', dtype=dtypes)
df.head()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2008,1,3,4,1343.0,1325,1451.0,1435,WN,588,...,4.0,9.0,0,,0,16.0,0.0,0.0,0.0,0.0
1,2008,1,3,4,1125.0,1120,1247.0,1245,WN,1343,...,3.0,8.0,0,,0,,,,,
2,2008,1,3,4,2009.0,2015,2136.0,2140,WN,3841,...,2.0,14.0,0,,0,,,,,
3,2008,1,3,4,903.0,855,1203.0,1205,WN,3,...,5.0,7.0,0,,0,,,,,
4,2008,1,3,4,1423.0,1400,1726.0,1710,WN,25,...,6.0,10.0,0,,0,16.0,0.0,0.0,0.0,0.0


In [5]:
df = df.drop(['UniqueCarrier', 'Origin', 'Dest', 'CancellationCode', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'TailNum'], axis=1)

In [7]:
df.tail(20)

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,FlightNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
794467,2008,4,17,4,1304.0,1250,1553.0,1536,1194,109.0,106.0,85.0,17.0,14.0,588,8.0,16.0,0,0
794468,2008,4,17,4,1655.0,1655,2340.0,2333,1194,285.0,278.0,252.0,7.0,0.0,2105,10.0,23.0,0,0
794469,2008,4,17,4,1747.0,1750,2012.0,2024,1195,265.0,274.0,246.0,-12.0,-3.0,1864,8.0,11.0,0,0
794470,2008,4,17,4,2128.0,2131,2244.0,2232,1195,136.0,121.0,103.0,12.0,-3.0,588,6.0,27.0,0,0
794471,2008,4,17,4,1117.0,1120,1348.0,1352,1197,391.0,392.0,362.0,-4.0,-3.0,2936,5.0,24.0,0,0
794472,2008,4,17,4,1844.0,1845,2126.0,2130,1198,162.0,165.0,138.0,-4.0,-1.0,950,15.0,9.0,0,0
794473,2008,4,17,4,1157.0,1200,1335.0,1337,1199,98.0,97.0,74.0,-2.0,-3.0,445,11.0,13.0,0,0
794474,2008,4,17,4,1441.0,1445,1632.0,1635,1200,111.0,110.0,89.0,-3.0,-4.0,576,6.0,16.0,0,0
794475,2008,4,17,4,612.0,615,811.0,819,1201,119.0,124.0,99.0,-8.0,-3.0,661,5.0,15.0,0,0
794476,2008,4,17,4,1940.0,1940,2128.0,2135,1202,108.0,115.0,91.0,-7.0,0.0,661,7.0,10.0,0,0


In [8]:
df.compute().info()

<class 'pandas.core.frame.DataFrame'>
Index: 2389217 entries, 0 to 794486
Data columns (total 19 columns):
 #   Column             Dtype  
---  ------             -----  
 0   Year               int64  
 1   Month              int64  
 2   DayofMonth         int64  
 3   DayOfWeek          int64  
 4   DepTime            float64
 5   CRSDepTime         int64  
 6   ArrTime            float64
 7   CRSArrTime         int64  
 8   FlightNum          int64  
 9   ActualElapsedTime  float64
 10  CRSElapsedTime     float64
 11  AirTime            float64
 12  ArrDelay           float64
 13  DepDelay           float64
 14  Distance           int64  
 15  TaxiIn             float64
 16  TaxiOut            float64
 17  Cancelled          int64  
 18  Diverted           int64  
dtypes: float64(9), int64(10)
memory usage: 364.6 MB


In [6]:
df = df.dropna()

In [14]:
# Вычисляем среднее время задержки вылета
mean_delay = df['DepDelay'].mean().compute()
print(f"Среднее время задержки вылета: {mean_delay:.2f} минут")

# Количество отмененных рейсов
cancelled_flights = df[df['Cancelled'] == 1].shape[0].compute()
print(f"Количество отмененных рейсов: {cancelled_flights}")

# Топ-5 маршрутов с максимальными задержками
top_routes = df.groupby(['Origin', 'Dest'])['DepDelay'].mean().nlargest(5).compute()
print("Топ-5 маршрутов с максимальными задержками:")
print(top_routes)

Среднее время задержки вылета: 11.44 минут
Количество отмененных рейсов: 64442
Топ-5 маршрутов с максимальными задержками:
Origin  Dest
SBN     CVG     518.0
SDF     SPI     329.0
HPN     PIA     298.0
TUL     PIA     243.0
ONT     SAN     221.0
Name: DepDelay, dtype: float64


In [7]:
y = df['DayOfWeek']

In [13]:
type(y)

dask.array.core.Array

In [8]:
x = df.drop('DayOfWeek', axis=1)

In [14]:
type(x)

dask.array.core.Array

In [9]:
x = x.to_dask_array()
y = y.to_dask_array()

In [22]:
x.shape

(2389217, 9)

In [23]:
y.shape

(2389217,)

In [10]:
x.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,318.48 MiB,106.64 MiB
Shape,"(2319121, 18)","(776558, 18)"
Dask graph,3 chunks in 6 graph layers,3 chunks in 6 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 318.48 MiB 106.64 MiB Shape (2319121, 18) (776558, 18) Dask graph 3 chunks in 6 graph layers Data type float64 numpy.ndarray",18  2319121,

Unnamed: 0,Array,Chunk
Bytes,318.48 MiB,106.64 MiB
Shape,"(2319121, 18)","(776558, 18)"
Dask graph,3 chunks in 6 graph layers,3 chunks in 6 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [11]:
y.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,17.69 MiB,5.92 MiB
Shape,"(2319121,)","(776558,)"
Dask graph,3 chunks in 6 graph layers,3 chunks in 6 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 17.69 MiB 5.92 MiB Shape (2319121,) (776558,) Dask graph 3 chunks in 6 graph layers Data type int64 numpy.ndarray",2319121  1,

Unnamed: 0,Array,Chunk
Bytes,17.69 MiB,5.92 MiB
Shape,"(2319121,)","(776558,)"
Dask graph,3 chunks in 6 graph layers,3 chunks in 6 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


In [74]:
df['DayOfWeek'].unique().compute()

0    4
1    5
2    6
3    7
4    1
5    2
6    3
Name: DayOfWeek, dtype: int64

In [29]:
from dask_ml.model_selection import ShuffleSplit
cv = ShuffleSplit(n_splits=5, test_size=0.3, train_size=0.7, random_state=0)
for train_idx, test_idx in cv.split(X=x, y=y):
    print(train_idx.compute())

[ 200804   55700  140489 ... 2212728 2038685 1899159]
[ 112116  718681  116671 ... 2332924 1706625 1990523]
[ 766376  225804  400088 ... 2116739 2125428 2331722]
[ 254924  169416  120110 ... 1954901 2034181 2023407]
[ 241991  602758  471322 ... 1813962 2165224 2153725]


In [52]:
from dask_ml.model_selection import KFold
cv = KFold(n_splits=5, random_state=0)

for train_idx, test_idx in cv.split(X=x, y=y):
    print(train_idx.compute())

[ 463825  463826  463827 ... 2319118 2319119 2319120]
[      0       1       2 ... 2319118 2319119 2319120]
[      0       1       2 ... 2319118 2319119 2319120]
[      0       1       2 ... 2319118 2319119 2319120]
[      0       1       2 ... 1855294 1855295 1855296]


In [47]:
from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression(solver_kwargs={"normalize":False})
lr.fit(x.compute(), y.compute())

  return np.exp(A)


In [12]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np
from joblib import Parallel, delayed, parallel_backend
import joblib
from dask.distributed import Client

client = Client(n_workers = 4, threads_per_worker = 1, memory_limit = '4GB')


# 2. Выбор модели
model = LogisticRegression(max_iter=200)

In [13]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np
from joblib import Parallel, delayed, parallel_backend
import joblib
from dask.distributed import Client

client = Client(n_workers = 4, threads_per_worker = 1, memory_limit = '4GB')


# 2. Выбор модели
model = LogisticRegression(max_iter=200)
with parallel_backend('dask'):
    model.fit(x, y)
    k = 5
    scores = cross_val_score(model, x, y, cv=k)
    mean_score = np.mean(scores)
    std_deviation = np.std(scores)
    print(f"Средний результат на кросс-валидации: {mean_score:.2f}")
    print(f"Стандартное отклонение: {std_deviation:.2f}")

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

Средний результат на кросс-валидации: 0.16
Стандартное отклонение: 0.01


In [82]:
from sklearn.model_selection import KFold
with parallel_backend('dask'):
    kf = KFold(n_splits=5)
    scores = cross_val_score(model, x, y, cv=kf)

    print("K-Fold CV Scores:", scores)
    print("Average Score:", scores.mean())

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

K-Fold CV Scores: [0.15971325 0.15202103 0.14817689 0.14711615 0.16047251]
Average Score: 0.1534999656278011


In [84]:
from sklearn.model_selection import StratifiedKFold
with parallel_backend('dask'):
    skf = StratifiedKFold(n_splits=5)
    scores = cross_val_score(model, x, y, cv=skf)

    print("Stratified K-Fold CV Scores:", scores)
    print("Average Score:", scores.mean())

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

Stratified K-Fold CV Scores: [0.15730071 0.1525169  0.16601771 0.15146262 0.15563231]
Average Score: 0.15658605104491957


In [86]:
from sklearn.model_selection import TimeSeriesSplit
with parallel_backend('dask'):
    # Этот пример может не быть совсем релевантным для датасета Iris, так как это не временной ряд.
    tscv = TimeSeriesSplit(n_splits=5)
    scores = cross_val_score(model, x, y, cv=tscv)

    print("Time Series Split CV Scores:", scores)
    print("Average Score:", scores.mean())


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

Time Series Split CV Scores: [0.15833333 0.14405723 0.13754269 0.14906603 0.16175877]
Average Score: 0.15015160923108764


In [13]:
from sklearn.model_selection import LeaveOneOut
with parallel_backend('dask'):
    loo = LeaveOneOut()
    scores = cross_val_score(model, x, y, cv=loo)

    print("Leave One Out CV Scores:", scores)
    print("Average Score:", scores.mean())

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

In [14]:
from sklearn.model_selection import ShuffleSplit
with parallel_backend('dask'):
    ss = ShuffleSplit(n_splits=5, test_size=0.25)
    scores = cross_val_score(model, x, y, cv=ss)

    print("Shuffle Split CV Scores:", scores)
    print("Average Score:", scores.mean())

  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]
  return array[key] if axis == 0 else array[:, key]


In [19]:
from sklearn.model_selection import GroupKFold
with parallel_backend('dask'):
# Для этого примера, мы будем использовать группы как дополнительный массив.
# Это может не быть совсем релевантным для датасета Iris без реальных групп.
    groups = np.arange(len(x)) // 10  # Просто пример группировки
    gkf = GroupKFold(n_splits=5)
    scores = cross_val_score(model, x, y, groups=groups, cv=gkf)

    print("Group K-Fold CV Scores:", scores)
    print("Average Score:", scores.mean())


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

Group K-Fold CV Scores: [0.1632969  0.16233749 0.16436082 0.16342331 0.16350308]
Average Score: 0.1633843212001469
