Load all accessible cyceling data from tfl.gov.uk S3 buckets3s3

In [2]:
import boto3

from botocore import UNSIGNED
from botocore.config import Config
from botocore.handlers import disable_signing
from pathlib import Path

#s3 = boto3.client('s3',config=Config(signature_version=UNSIGNED))
s3 = boto3.resource('s3')
s3.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)

tfl_bucket = s3.Bucket('cycling.data.tfl.gov.uk')

In [3]:
print(tfl_bucket.get_available_subresources())

['Acl', 'Cors', 'Lifecycle', 'LifecycleConfiguration', 'Logging', 'Notification', 'Object', 'Policy', 'RequestPayment', 'Tagging', 'Versioning', 'Website']


In [4]:
from pathlib import Path
from tqdm import tqdm

data_path = Path('data')
data_path.mkdir(parents=True,exist_ok=True)


bucker_resources = tfl_bucket.objects.filter(Prefix='usage-stats/')
res_len = 0
for _ in bucker_resources:
    res_len += 1

bucker_resources = tfl_bucket.objects.filter(Prefix='usage-stats/')

for res in tqdm(bucker_resources,total=res_len):
    #print(res.key,res.key.split('/')[-1])
    try:
        path = Path('data') / Path(res.key.split('/')[-1])
        #print("Save to",path)
        tfl_bucket.download_file(res.key,str(path))
    except Exception as e:
        #print(e)
        continue

100%|██████████| 371/371 [07:56<00:00,  1.28s/it]


In [5]:
import pandas as pd
import dask.dataframe as ddf
import dask.diagnostics as ddiag
from tqdm.dask import TqdmCallback
from pathlib import Path

In [6]:
files = [ x for x in Path('data').iterdir() if x.suffix == '.csv']

Convert data to parquet files as they are easy to read

In [28]:
sv_path = Path('data')/Path('compressed')
sv_path.mkdir(parents=True,exist_ok=True)
for file in tqdm(files):
    df = pd.read_csv(file)
    df.to_parquet(sv_path/Path(file.stem+'.parquet'))


100%|██████████| 363/363 [02:31<00:00,  2.40it/s]


In [9]:
parquetta = [ x for x in (Path('data')/'compressed').iterdir() ]

In [10]:
df_list = list()
for file in tqdm(parquetta):
    df_list.append(pd.read_parquet(file))
gdf = pd.concat(df_list)
print(not gdf['Rental Id'].is_unique)

100%|██████████| 363/363 [00:24<00:00, 14.58it/s]


True


In [11]:
print(gdf.size)
gdf.to_parquet(Path('data')/'full_data.parquet')

1626367100


In [None]:
gdf['Start Date'] = pd.to_datetime(gdf['Start Date'],exact=False,infer_datetime_format=True)

In [18]:
from dask.diagnostics import progress

with progress.ProgressBar():
    dgdf = ddf.from_pandas(gdf,32)

In [19]:
dgdf['End Date'] = ddf.to_datetime(dgdf['End Date'],exact=False,errors='coerce',infer_datetime_format=True)
dgdf['Start Date'] = ddf.to_datetime(dgdf['Start Date'],exact=False,errors='coerce',infer_datetime_format=True)

In [20]:
with progress.ProgressBar():
    dgdf.compute()

[########################################] | 100% Completed | 410.04 s


In [21]:
with progress.ProgressBar():
    nacount = dgdf.count().compute()


[########################################] | 100% Completed | 461.70 s


In [22]:
dgdf.head()

Unnamed: 0,Rental Id,Duration,Bike Id,End Date,EndStation Id,EndStation Name,Start Date,StartStation Id,StartStation Name,Unnamed: 9,Unnamed: 10,Unnamed: 11,EndStation Logical Terminal,endStationPriority_id,StartStation Logical Terminal,Duration_Seconds,End Station Id,End Station Name,Start Station Id,Start Station Name
0,50754225,240.0,11834.0,2016-10-01 00:04:00,383.0,"Frith Street, Soho",2016-10-01 00:00:00,18.0,"Drury Lane, Covent Garden",,,,,,,,,,,
0,56723340,720.0,13873.0,2016-03-08 00:12:00,426.0,"Vincent Street, Pimlico",2016-03-08 00:00:00,160.0,"Waterloo Place, St. James's",,,,,,,,,,,
0,71029721,180.0,13482.0,2017-02-11 18:23:00,67.0,"Hatton Garden, Holborn",2017-02-11 18:20:00,71.0,"Newgate Street , St. Paul's",,,,,,,,,,,
0,93611465,360.0,15418.0,2019-10-12 09:02:00,71.0,"Newgate Street , St. Paul's",2019-10-12 08:56:00,101.0,"Queen Street 1, Bank",,,,,,,,,,,
0,89258337,480.0,13889.0,NaT,11.0,"Brunswick Square, Bloomsbury",NaT,189.0,"Claremont Square, Angel",,,,,,,,,,,


In [24]:
print(nacount)

Rental Id                        81318355
Duration                         79899342
Bike Id                          81318337
End Date                         79549014
EndStation Id                    79356048
EndStation Name                  79897831
Start Date                       79713559
StartStation Id                  79835469
StartStation Name                80067969
Unnamed: 9                              0
Unnamed: 10                             0
Unnamed: 11                             0
EndStation Logical Terminal        229639
endStationPriority_id              229639
StartStation Logical Terminal      232500
Duration_Seconds                  1250345
End Station Id                    1250386
End Station Name                  1250386
Start Station Id                  1250386
Start Station Name                1250386
dtype: int64
