# Data Science with Python and Dask
## Chapter 6: Summarizing and Analyzing DataFrames

In [72]:
# Before beginning, set your working directory to where the data resides
import os
os.chdir('/Users/Abba/Documents/data-science-python-dask')

### Section 6.1.2

In [73]:
# Listing 6.1
import dask.dataframe as dd
import pyarrow
from dask.diagnostics import ProgressBar

nyc_data = dd.read_parquet('nyc_final', engine='pyarrow')

In [74]:
nyc_data.dtypes

Summons Number                                     uint32
Plate ID                                  string[pyarrow]
Registration State                        string[pyarrow]
Issue Date                           date32[day][pyarrow]
Violation Code                                     uint16
Vehicle Body Type                         string[pyarrow]
Vehicle Make                              string[pyarrow]
Issuing Agency                            string[pyarrow]
Street Code1                                       uint32
Street Code2                                       uint32
Street Code3                                       uint32
Vehicle Expiration Date                   string[pyarrow]
Violation Location                        string[pyarrow]
Violation Precinct                                float32
Issuer Precinct                                   float32
Issuer Code                                       float32
Issuer Command                            string[pyarrow]
Issuer Squad  

In [4]:
# Listing 6.2
with ProgressBar():
    vehicle_age_by_year = nyc_data['Vehicle Year'].value_counts().compute()
vehicle_age_by_year

[########################################] | 100% Completed | 968.24 ms


Vehicle Year
1973.0     977
1977.0    1276
1979.0    1887
1980.0    1996
1982.0    2164
          ... 
2022.0      93
2030.0      30
2048.0      33
2052.0      47
2054.0      61
Name: count, Length: 100, dtype: int64

In [5]:
# Listing 6.3
with ProgressBar():
    condition = (nyc_data['Vehicle Year'] > 0) & (nyc_data['Vehicle Year'] <= 2018)
    vehicle_age_by_year = nyc_data[condition]['Vehicle Year'].value_counts().compute().sort_index()
vehicle_age_by_year

[########################################] | 100% Completed | 987.79 ms


Vehicle Year
1970.0        775
1971.0        981
1972.0        971
1973.0        977
1974.0        786
1975.0        949
1976.0        910
1977.0       1276
1978.0       1481
1979.0       1887
1980.0       1996
1981.0       1877
1982.0       2164
1983.0       3277
1984.0       5351
1985.0      10365
1986.0      23731
1987.0      32087
1988.0      69377
1989.0      41674
1990.0      77031
1991.0      29877
1992.0      60789
1993.0      65870
1994.0      93404
1995.0     201606
1996.0     219047
1997.0     372041
1998.0     417273
1999.0     535844
2001.0     857291
2002.0    1001788
2003.0    1155400
2004.0    1345823
2005.0    1470302
2006.0    1600032
2007.0    1801081
2008.0    1528735
2009.0    1210117
2010.0    1362354
2011.0    1683653
2012.0    2070360
2013.0    2846851
2014.0    2733039
2015.0    2423990
2016.0    1280706
2017.0     297496
2018.0       2491
Name: count, dtype: int64

In [6]:
# Listing 6.4
nyc_data_filtered = nyc_data[condition]

def age_calculation(row):
    return int(row['Issue Date'].year - row['Vehicle Year'])

vehicle_age = nyc_data_filtered.apply(age_calculation, axis=1, meta=('Vehicle Age', 'int'))

nyc_data_vehicle_age_stg1 = nyc_data_filtered.assign(VehicleAge=vehicle_age)
nyc_data_vehicle_age_stg2 = nyc_data_vehicle_age_stg1.rename(columns={'VehicleAge':'Vehicle Age'})

nyc_data_with_vehicle_age = nyc_data_vehicle_age_stg2[nyc_data_vehicle_age_stg2['Vehicle Age'] >= 0]

In [7]:
# Listing 6.5
with ProgressBar():
    files = nyc_data_with_vehicle_age.to_parquet('nyc_data_vehicleAge', engine='pyarrow')

nyc_data_with_vehicle_age = dd.read_parquet('nyc_data_vehicleAge', engine='pyarrow')

[########################################] | 100% Completed | 433.93 s


In [8]:
# Listing 6.6
from dask.array import stats as dask_stats
with ProgressBar():
    mean = nyc_data_with_vehicle_age['Vehicle Age'].mean().compute()
    stdev = nyc_data_with_vehicle_age['Vehicle Age'].std().compute()
    minimum = nyc_data_with_vehicle_age['Vehicle Age'].min().compute()
    maximum = nyc_data_with_vehicle_age['Vehicle Age'].max().compute()
    skewness = float(dask_stats.skew(nyc_data_with_vehicle_age['Vehicle Age'].values).compute())

[########################################] | 100% Completed | 620.87 ms
[########################################] | 100% Completed | 849.93 ms
[########################################] | 100% Completed | 527.20 ms
[########################################] | 100% Completed | 533.10 ms
[########################################] | 100% Completed | 953.41 ms


### Section 6.1.3

In [9]:
# Listing 6.7
with ProgressBar():
    descriptive_stats = nyc_data_with_vehicle_age['Vehicle Age'].describe().compute()
descriptive_stats.round(2)

[########################################] | 100% Completed | 1.11 sms


count    28773482.00
mean            6.74
std             5.66
min             0.00
25%             2.00
50%             6.00
75%            11.00
max            47.00
Name: Vehicle Age, dtype: float64

### Section 6.2.2

In [10]:
# Listing 6.8
import pandas as pd
# this custom sort is only necessary if the monthYear column is a string.
years = ['2014', '2015', '2016', '2017']
months = ['01','02','03','04','05','06','07','08','09','10','11','12']
years_months = [year + month for year in years for month in months]

sort_order = pd.Series(range(len(years_months)), index=years_months, name='custom_sort')

def sort_by_months(dataframe, order):
    return dataframe.join(order).sort_values('custom_sort').drop('custom_sort', axis=1)

In [11]:
# Listing 6.9
with ProgressBar():
    nyc_data_by_month = nyc_data.groupby('monthYear')
    citations_per_month = nyc_data_by_month['Summons Number'].count().compute()
sort_by_months(citations_per_month.to_frame(), sort_order)

[########################################] | 100% Completed | 1.03 sms


Unnamed: 0_level_0,Summons Number
monthYear,Unnamed: 1_level_1
201401,703323
201402,641438
201403,899639
201404,879840
201405,941133
201406,940743
201407,961567
201408,901624
201409,1018933
201410,956967


In [14]:
# Listing 6.10
with ProgressBar():
    #condition = ~nyc_data['monthYear'].isin(['201707','201708','201709','201710','201711','201712'])# if monthYear is str
    condition = ~nyc_data['monthYear'].isin([201707,201708,201709,201710,201711,201712])# if monthYear is int
    nyc_data_filtered = nyc_data[condition]
    citations_and_temps = nyc_data_filtered.groupby('monthYear').agg({'Summons Number': 'count', 'Temp': 'mean'})
    correlation_matrix = citations_and_temps.corr().compute()
correlation_matrix

[########################################] | 100% Completed | 1.68 sms


Unnamed: 0,Summons Number,Temp
Summons Number,1.0,0.141743
Temp,0.141743,1.0


### Section 6.3.2

In [15]:
# Listing 6.11
nyc_data_with_vehicle_age = dd.read_parquet('nyc_data_vehicleAge', engine='pyarrow')

nyc_data_filtered = nyc_data_with_vehicle_age[nyc_data_with_vehicle_age ['Plate Type'].isin(['PAS','COM'])]

In [16]:
# Listing 6.12
with ProgressBar():
    N = nyc_data_filtered['Vehicle Age'].count().compute()
    p = nyc_data_filtered['Plate Type'].unique().count().compute()
brown_forsythe_left = (N - p) / (p - 1)

[########################################] | 100% Completed | 1.04 sms
[########################################] | 100% Completed | 715.46 ms


In [17]:
# Listing 6.13
with ProgressBar():
    passenger_vehicles = nyc_data_filtered[nyc_data_filtered['Plate Type'] == 'PAS']
    commercial_vehicles = nyc_data_filtered[nyc_data_filtered['Plate Type'] == 'COM']
    median_PAS = passenger_vehicles['Vehicle Age'].quantile(0.5).compute()
    median_COM = commercial_vehicles['Vehicle Age'].quantile(0.5).compute()

[########################################] | 100% Completed | 1.02 sms
[########################################] | 100% Completed | 821.96 ms


In [18]:
# Listing 6.14
def absolute_deviation_from_median(row):
    if row['Plate Type'] == 'PAS':
        return abs(row['Vehicle Age'] - median_PAS)
    else:
        return abs(row['Vehicle Age'] - median_COM)

In [19]:
# Listing 6.15
absolute_deviation = nyc_data_filtered.apply(absolute_deviation_from_median, axis=1, meta=('x', 'float32'))

nyc_data_age_type_test_stg1 = nyc_data_filtered.assign(MedianDifferences = absolute_deviation)
nyc_data_age_type_test = nyc_data_age_type_test_stg1.rename(columns={'MedianDifferences':'Median Difference'})

In [20]:
# Listing 6.16
with ProgressBar():
    group_means = nyc_data_age_type_test.groupby('Plate Type')['Median Difference'].mean().compute()

[########################################] | 100% Completed | 390.66 s


In [21]:
# Listing 6.17
def group_mean_variance(row):
    if row['Plate Type'] == 'PAS':
        return (row['Median Difference'] - group_means['PAS'])**2
    else:
        return (row['Median Difference'] - group_means['COM'])**2
    
group_mean_variances = nyc_data_age_type_test.apply(group_mean_variance, axis=1, meta=('x', 'float32'))

nyc_data_age_type_test_gmv_stg1 = nyc_data_age_type_test.assign(GroupMeanVariances = group_mean_variances)
nyc_data_age_type_test_gmv = nyc_data_age_type_test_gmv_stg1.rename(columns={'GroupMeanVariances':'Group Mean Variance'})

In [22]:
# Listing 6.18
with ProgressBar():
    brown_forsythe_right_denominator = nyc_data_age_type_test_gmv['Group Mean Variance'].sum().compute()

[########################################] | 100% Completed | 14m 5sss


In [23]:
# Listing 6.19
with ProgressBar():
    grand_mean = nyc_data_age_type_test['Median Difference'].mean().compute()

[########################################] | 100% Completed | 404.52 s


In [24]:
# Listing 6.20
brown_forsythe_aggregation = dd.Aggregation(
    'Brown_Forsythe',
    lambda chunk: (chunk.count(), chunk.sum()),
    lambda chunk_count, chunk_sum: (chunk_count.sum(), chunk_sum.sum()),
    lambda group_count, group_sum: group_count * (((group_sum / group_count) - grand_mean)**2)
)

In [25]:
# Listing 6.21
with ProgressBar():
    group_variances = nyc_data_age_type_test.groupby('Plate Type').agg({'Median Difference': brown_forsythe_aggregation}).compute()

[########################################] | 100% Completed | 425.91 s


In [26]:
# Listing 6.22
brown_forsythe_right_numerator = group_variances.sum()[0]

In [27]:
# Listing 6.23
F_statistic = brown_forsythe_left * (brown_forsythe_right_numerator / brown_forsythe_right_denominator)

In [28]:
# Listing 6.24
import scipy.stats as stats
alpha = 0.05
df1 = p - 1
df2 = N - p
F_critical = stats.f.ppf(q=1-alpha, dfn=df1, dfd=df2)

In [29]:
# Listing 6.25
print("Using the Brown-Forsythe Test for Equal Variance")
print("The Null Hypothesis states: the variance is constant among groups")
print("The Alternative Hypothesis states: the variance is not constant among groups")
print("At a confidence level of " + str(alpha) + ", the F statistic was " + str(F_statistic) + " and the F critical value was " + str(F_critical) + ".")
if F_statistic > F_critical:
    print("We can reject the null hypothesis. Set equal_var to False.")
else:
    print("We fail to reject the null hypothesis. Set equal_var to True.")

Using the Brown-Forsythe Test for Equal Variance
The Null Hypothesis states: the variance is constant among groups
The Alternative Hypothesis states: the variance is not constant among groups
At a confidence level of 0.05, the F statistic was 5634.353318745531 and the F critical value was 3.841459177055835.
We can reject the null hypothesis. Set equal_var to False.


In [30]:
# Listing 6.26
with ProgressBar():
    pas = passenger_vehicles['Vehicle Age'].values.compute()
    com = commercial_vehicles['Vehicle Age'].values.compute()

[########################################] | 100% Completed | 945.04 ms
[########################################] | 100% Completed | 919.11 ms


In [31]:
# Listing 6.27
stats.ttest_ind(pas, com, equal_var=False)

TtestResult(statistic=np.float64(-282.48265159548777), pvalue=np.float64(0.0), df=np.float64(10563639.6105696))

### Section 6.4.1

In [68]:
# Listing 6.28
nyc_data = dd.read_parquet('nyc_final', engine='pyarrow')

with ProgressBar():
    #condition = ~nyc_data['monthYear'].isin(['201707','201708','201709','201710','201711','201712'])
    condition = ~nyc_data['monthYear'].isin([201707,201708,201709,201710,201711,201712])
    nyc_data_filtered = nyc_data[condition]
    #citations_by_month = nyc_data_filtered.groupby(nyc_data_filtered.index)['Summons Number'].count()
    citations_by_month = nyc_data_filtered.groupby('monthYear')['Summons Number'].count()

In [69]:
%%time
citations_by_month.head(15)

CPU times: total: 6.06 s
Wall time: 1.32 s


monthYear
201401     703323
201402     641438
201403     899639
201404     879840
201405     941133
201406     940743
201407     961567
201408     901624
201409    1018933
201410     956967
201411     790246
201412     765599
201501    1381208
201502     723971
201503     948955
Name: Summons Number, dtype: int64

### Section 6.4.2

In [70]:
# Listing 6.29
with ProgressBar():
    three_month_SMA = citations_by_month.rolling(3).mean().compute()

[########################################] | 100% Completed | 1.30 sms


In [71]:
# Listing 6.30
citations_by_month.rolling(3, center=True).mean().head()

monthYear
201401              NaN
201402    748133.333333
201403    806972.333333
201404    906870.666667
201405    920572.000000
Name: Summons Number, dtype: float64