* https://www.gov.uk/guidance/mot-inspection-manual-for-private-passenger-and-light-commercial-vehicles
* https://www.data.gov.uk/dataset/e3939ef8-30c7-4ca8-9c7c-ad9475cc9b2f/anonymised-mot-tests-and-results
* https://www.gov.uk/government/news/mot-changes-20-may-2018

In [1]:
from dask.distributed import Client
from distributed import progress
from glob import glob
from operator import itemgetter
import dask.dataframe as dd
import csv
import pandas as pd

In [2]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 31.30 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45099,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 31.30 GiB

0,1
Comm: tcp://127.0.0.1:45391,Total threads: 3
Dashboard: http://127.0.0.1:33773/status,Memory: 7.83 GiB
Nanny: tcp://127.0.0.1:33055,
Local directory: /tmp/dask-scratch-space/worker-d3wf25lt,Local directory: /tmp/dask-scratch-space/worker-d3wf25lt

0,1
Comm: tcp://127.0.0.1:38835,Total threads: 3
Dashboard: http://127.0.0.1:42541/status,Memory: 7.83 GiB
Nanny: tcp://127.0.0.1:39147,
Local directory: /tmp/dask-scratch-space/worker-vr18wfnk,Local directory: /tmp/dask-scratch-space/worker-vr18wfnk

0,1
Comm: tcp://127.0.0.1:44005,Total threads: 3
Dashboard: http://127.0.0.1:37087/status,Memory: 7.83 GiB
Nanny: tcp://127.0.0.1:36317,
Local directory: /tmp/dask-scratch-space/worker-z_zzip14,Local directory: /tmp/dask-scratch-space/worker-z_zzip14

0,1
Comm: tcp://127.0.0.1:32955,Total threads: 3
Dashboard: http://127.0.0.1:35299/status,Memory: 7.83 GiB
Nanny: tcp://127.0.0.1:37799,
Local directory: /tmp/dask-scratch-space/worker-3cxq6bjq,Local directory: /tmp/dask-scratch-space/worker-3cxq6bjq


In [3]:
txt_files = glob('data/test_*.txt')
csv_files = glob('data/unzipped/**/*.csv', recursive=True)

files = txt_files + csv_files
len(files)

104

In [4]:
def sniff_dialect(file):
    with open(file) as csvfile:
        dialect = csv.Sniffer().sniff(csvfile.read(10000))
        dialect_info = dict(dialect.__dict__)
        dialect_info['file'] = file
    return dialect_info

In [5]:
dialects = [sniff_dialect(file) for file in files]

csv_info_df = (
    pd.DataFrame(dialects)
    .drop(columns=['__module__', '_name', '__doc__'])
    .set_index('file')
    .sort_index()
    .reset_index()
)

csv_info_df

Unnamed: 0,file,lineterminator,quoting,doublequote,delimiter,quotechar,skipinitialspace
0,data/test_item_2005.txt,\r\n,0,False,|,"""",False
1,data/test_item_2006.txt,\r\n,0,False,|,"""",False
2,data/test_item_2007.txt,\r\n,0,False,|,"""",False
3,data/test_item_2008.txt,\r\n,0,False,|,"""",False
4,data/test_item_2009.txt,\r\n,0,False,|,"""",False
...,...,...,...,...,...,...,...
99,data/unzipped/test_result_31869.csv,\r\n,0,False,|,"""",False
100,data/unzipped/test_result_31870.csv,\r\n,0,False,|,"""",False
101,data/unzipped/test_result_31871.csv,\r\n,0,False,|,"""",False
102,data/unzipped/test_result_31876.csv,\r\n,0,False,|,"""",False


In [6]:
csv_info_df.lineterminator.describe()

count      104
unique       1
top       \r\n
freq       104
Name: lineterminator, dtype: object

In [7]:
csv_info_df.skipinitialspace.describe()

count       104
unique        1
top       False
freq        104
Name: skipinitialspace, dtype: object

In [8]:
csv_info_df.delimiter.unique()

array(['|', ','], dtype=object)

In [9]:
results_files_df = csv_info_df[csv_info_df.file.str.contains('result')]
results_files_df

Unnamed: 0,file,lineterminator,quoting,doublequote,delimiter,quotechar,skipinitialspace
12,data/test_result_2005.txt,\r\n,0,False,|,"""",False
13,data/test_result_2006.txt,\r\n,0,False,|,"""",False
14,data/test_result_2007.txt,\r\n,0,False,|,"""",False
15,data/test_result_2008.txt,\r\n,0,False,|,"""",False
16,data/test_result_2009.txt,\r\n,0,False,|,"""",False
17,data/test_result_2010.txt,\r\n,0,False,|,"""",False
18,data/test_result_2011.txt,\r\n,0,False,|,"""",False
19,data/test_result_2012.txt,\r\n,0,False,|,"""",False
20,data/test_result_2013.txt,\r\n,0,False,|,"""",False
21,data/test_result_2014.txt,\r\n,0,False,|,"""",False


## Results

In [10]:
def read_results(file, sep):
    df = dd.read_csv(file,
                     delimiter=sep,
                     doublequote=False,
                     on_bad_lines='warn',
                     include_path_column=True,
                     parse_dates=['test_date'],
                     dtype_backend='pyarrow',
                     # engine='pyarrow',
                    )
    return df

def parse_dates_pd(df):
    df['first_use_date'] = pd.to_datetime(df['first_use_date'], format='ISO8601', errors='coerce')
    return df

def parse_dates_dd(df):
    df['first_use_date'] = dd.to_datetime(df['first_use_date'], format='ISO8601', errors='coerce')
    return df

In [11]:
result_records = results_files_df[['file', 'delimiter']].to_records(index=False)
ddfs = [read_results(file, sep) for file, sep in result_records]

In [12]:
ddf = (
    dd.concat(ddfs)
    #.map_partitions(parse_dates_pd) # pandas version
    .pipe(parse_dates_dd) # dask version
)

In [13]:
ddf.dtypes

test_id               int64[pyarrow]
vehicle_id            int64[pyarrow]
test_date             datetime64[ns]
test_class_id         int64[pyarrow]
test_type            string[pyarrow]
test_result          string[pyarrow]
test_mileage          int64[pyarrow]
postcode_area        string[pyarrow]
make                 string[pyarrow]
model                string[pyarrow]
colour               string[pyarrow]
fuel_type            string[pyarrow]
cylinder_capacity     int64[pyarrow]
first_use_date        datetime64[ns]
path                        category
dtype: object

Task exception was never retrieved
future: <Task finished name='Task-2086' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2080' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2022' cor

Task exception was never retrieved
future: <Task finished name='Task-2810' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2807' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2852' cor

Task exception was never retrieved
future: <Task finished name='Task-2811' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2221' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2250' cor

Task exception was never retrieved
future: <Task finished name='Task-2171' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-1986' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-1964' cor

Task exception was never retrieved
future: <Task finished name='Task-2068' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2008' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2095' cor

Task exception was never retrieved
future: <Task finished name='Task-2099' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-1921' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2281' cor

Task exception was never retrieved
future: <Task finished name='Task-3590' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-3587' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-3571' cor

Task exception was never retrieved
future: <Task finished name='Task-3604' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2445' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2309' cor

Task exception was never retrieved
future: <Task finished name='Task-2867' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2833' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-3565' cor

Task exception was never retrieved
future: <Task finished name='Task-2331' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2446' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2345' cor

Task exception was never retrieved
future: <Task finished name='Task-2844' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2860' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2815' cor

Task exception was never retrieved
future: <Task finished name='Task-2193' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2232' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2284' cor

Task exception was never retrieved
future: <Task finished name='Task-1993' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2049' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2156' cor

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_93725/1354886235.py", line 1, in <module>
    ddf.info(verbose=True)
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/dask/dataframe/core.py", line 6054, in info
    zip(computations.keys(), da.compute(*computations.values()))
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 3226, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
              

Task exception was never retrieved
future: <Task finished name='Task-2925' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2921' coro=<Client._gather.<locals>.wait() done, defined at /home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py:2189> exception=AllExit()>
Traceback (most recent call last):
  File "/home/giles/.miniconda3/envs/pydatalondon2023/lib/python3.11/site-packages/distributed/client.py", line 2198, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2915' cor

In [14]:
ddf.head()

Unnamed: 0,test_id,vehicle_id,test_date,test_class_id,test_type,test_result,test_mileage,postcode_area,make,model,colour,fuel_type,cylinder_capacity,first_use_date,path
0,804664368,256274986,2005-01-01,0,NT,P,23459,TF,FORD,UNCLASSIFIED,SILVER,PE,,NaT,/home/giles/Projects/DS/PyData/mot_pandas2_pol...
1,392603376,633988704,2005-01-01,0,NT,P,40961,E,LOTUS,UNCLASSIFIED,RED,PE,,NaT,/home/giles/Projects/DS/PyData/mot_pandas2_pol...
2,1894843206,1320781748,2005-01-01,0,NT,P,16416,S,VAUXHALL,UNCLASSIFIED,BLUE,PE,,NaT,/home/giles/Projects/DS/PyData/mot_pandas2_pol...
3,830908928,1263031090,2005-01-01,4,NT,P,93318,W,LAND ROVER,109 V8 S.W.,BLUE,PE,3528.0,1981-04-06,/home/giles/Projects/DS/PyData/mot_pandas2_pol...
4,727535460,1123257842,2005-01-01,4,NT,P,121930,RG,CITROEN,AX,WHITE,DI,1360.0,1993-08-31,/home/giles/Projects/DS/PyData/mot_pandas2_pol...


In [20]:
f = ddf.to_parquet('test_result.parquet',
                   #compression='snappy',
                   write_index=False, overwrite=True, compute=False).persist()
progress(f)

VBox()

In [21]:
import pyarrow as pa
import pyarrow.parquet as pq
# table = pa.Table.from_pydict({'x': list(range(100000))})
# pq.write_table(table, '/tmp/foo.parquet')
pq.ParquetFile('test_result.parquet/part.0.parquet').metadata.row_group(0).column(0).compression
# 'SNAPPY'

FileNotFoundError: [Errno 2] Failed to open local file 'test_result.parquet/part.0.parquet'. Detail: [errno 2] No such file or directory

### Roundtrip

In [18]:
ddf_result = dd.read_parquet('test_result.parquet')

In [19]:
# Note the differing dtypes
ddf_result.info(verbose=True)

<class 'dask.dataframe.core.DataFrame'>
Index: 639506962 entries, 0 to 791798
Data columns (total 15 columns):
 #   Column             Non-Null Count  Dtype
---  ------             --------------  -----
 0   test_id            639506962 non-null      int64[pyarrow]
 1   vehicle_id         639506962 non-null      int64[pyarrow]
 2   test_date          639506962 non-null      datetime64[ns]
 3   test_class_id      639506962 non-null      int64[pyarrow]
 4   test_type          639506962 non-null      string
 5   test_result        639506962 non-null      string
 6   test_mileage       632574944 non-null      int64[pyarrow]
 7   postcode_area      639506962 non-null      string
 8   make               639506927 non-null      string
 9   model              639505681 non-null      string
10   colour             639506962 non-null      string
11   fuel_type          639506962 non-null      string
12   cylinder_capacity  638550168 non-null      int64[pyarrow]
13   first_use_date     639479303 

In [None]:
ddf_result.head()

In [None]:
ddf_result.npartitions

In [None]:
rover_df = ddf_result.query('vehicle_id == 1238787680').compute()

In [None]:
rover_df

In [None]:
rover_df.set_index('test_date').test_mileage.plot(marker='.')

## Items

In [None]:
item_files_df = csv_info_df[csv_info_df.file.str.contains('item')]
item_files_df

In [None]:
def read_items(file, sep):
    df = dd.read_csv(file,
                     delimiter=sep,
                     dtype={'dangerous_mark': 'string[pyarrow]'},
                     dtype_backend='pyarrow',
                     # engine='pyarrow',
                    )
        
    return df

item_records = item_files_df[['file', 'delimiter']].to_records(index=False)
item_ddfs = [read_items(file, sep) for file, sep in item_records]

In [None]:
item_ddf = dd.concat(item_ddfs)

In [None]:
item_ddf.dtypes

In [None]:
item_ddf.info()
item_ddf.head()

In [None]:
f = (
    item_ddf
    .to_parquet('item.parquet', write_index=False, overwrite=True, compute=False)
    .persist()
)

progress(f)

In [None]:
item_ddf = dd.read_parquet('item.parquet')

In [None]:
rfr_counts = item_ddf.rfr_id.value_counts().compute()

In [None]:
rfr_counts

## Reasons for rejection (test fail)

Not sure if this includes advisories

In [None]:
rfr_df = dd.read_csv('data/unzipped/dft_item_detail.csv', sep='|', dtype_backend='pyarrow')
rfr_df.info(verbose=True)
rfr_df.head()

In [None]:
rfr_df.query('rfr_id == 8394').compute()