In [1]:
from utz import *

In [2]:
src_bkt = 'tripdata'
dst_bkt = 'ctbk'
dst_root = None

from boto3 import client
from botocore import UNSIGNED
from botocore.client import Config
s3 = client('s3', config=Config())

In [3]:
resp = s3.list_objects_v2(Bucket=src_bkt)
contents = pd.DataFrame(resp['Contents'])
zips = contents[contents.Key.str.endswith('.zip')]
zips

Unnamed: 0,Key,LastModified,ETag,Size,StorageClass
0,201306-citibike-tripdata.zip,2018-04-30 13:18:55+00:00,"""b520a12de58eea58a3586f89bfcfbd9d-2""",16785103,STANDARD
1,201307-201402-citibike-tripdata.zip,2017-01-18 22:23:25+00:00,"""7b3b260b2ab2e5349320121d04bd821c-22""",178262576,STANDARD
2,201307-citibike-tripdata.zip,2017-01-18 22:23:27+00:00,"""dd3e6fd5f91715b31eae72868086c08c-4""",27074629,STANDARD
3,201308-citibike-tripdata.zip,2017-01-18 22:23:27+00:00,"""2f661063576734f614b9f1d6bba0ec59-4""",32090869,STANDARD
4,201309-citibike-tripdata.zip,2017-01-18 22:23:27+00:00,"""a42f947db7bd14e423a7dbfbb11596a1-4""",33155593,STANDARD
...,...,...,...,...,...
155,JC-202010-citibike-tripdata.csv.zip,2020-11-04 14:51:11+00:00,"""148431d3598f7e962338c33da2afddf3""",798066,STANDARD
156,JC-202011-citibike-tripdata.csv.zip,2020-12-04 23:26:04+00:00,"""ab9ee4bbbc03633d610e18319d23fc21""",569245,STANDARD
157,JC-202012-citibike-tripdata.csv.zip,2021-01-05 14:25:45+00:00,"""112033c48cf3fa673b396364a7cc08f6""",315012,STANDARD
158,JC-202101-citibike-tripdata.csv.zip,2021-02-08 15:11:26+00:00,"""02e5da50db92added528f438587bb1e7""",313806,STANDARD


In [4]:
rgx = r'^(?P<JC>JC-)?(?P<year>\d{4})(?P<month>\d{2})[ \-]citibike-tripdata?(?P<csv>\.csv)?(?P<zip>\.zip)?$'

In [5]:
fields = {
  'Trip Duration',
  'Start Time',
  'Stop Time',
  'Start Station ID',
  'Start Station Name',
  'Start Station Latitude',
  'Start Station Longitude',
  'End Station ID',
  'End Station Name',
  'End Station Latitude',
  'End Station Longitude',
  'Bike ID',
  'User Type',
  'Birth Year',
  'Gender'
}
def normalize_field(f): return sub(r'\s', '', f.lower())
normalize_fields_map = { normalize_field(f): f for f in fields }
normalize_fields_map

{'stoptime': 'Stop Time',
 'tripduration': 'Trip Duration',
 'endstationlongitude': 'End Station Longitude',
 'birthyear': 'Birth Year',
 'startstationlongitude': 'Start Station Longitude',
 'endstationname': 'End Station Name',
 'usertype': 'User Type',
 'gender': 'Gender',
 'startstationname': 'Start Station Name',
 'endstationlatitude': 'End Station Latitude',
 'startstationid': 'Start Station ID',
 'endstationid': 'End Station ID',
 'starttime': 'Start Time',
 'bikeid': 'Bike ID',
 'startstationlatitude': 'Start Station Latitude'}

In [6]:
def normalize_fields(df):
    return df.rename(columns={
        col: normalize_fields_map[normalize_field(col)]
        for col in df.columns
    })

In [7]:
from zipfile import ZipFile

In [8]:
from botocore.client import ClientError
def s3_exists(Bucket, Key, s3=None):
    if not s3:
        s3 = client('s3', config=Config(signature_version=UNSIGNED))
    try:
        s3.head_object(Bucket=Bucket, Key=Key)
        return True
    except ClientError:
        return False

In [9]:
def to_parquet(zip_key, error='warn', overwrite=False):
    name = basename(zip_key)
    m = match(rgx, name)
    if not m:
        msg = f'Unrecognized key: {name}'
        if error == 'warn':
            print(msg)
            return msg
        else:
            raise Exception(msg)
    base, ext = splitext(zip_key)
    assert ext == '.zip'
    if base.endswith('.csv'):
        base = splitext(base)[0]

    # normalize the dst path; a few src files have typos/inconsistencies
    base = '%s%s%s-citibike-tripdata' % (m['JC'] or '', m['year'], m['month'])
    if dst_root is None:
        dst_key = f'{base}.parquet'
    else:
        dst_key = f'{dst_root}/{base}.parquet'
    dst = f's3://{dst_bkt}/{dst_key}'
    s3 = client('s3', config=Config())
    if s3_exists(dst_bkt, dst_key, s3=s3):
        if overwrite:
            msg = f'Overwrote {dst}'
            print(f'Overwriting {dst}')
        else:
            msg = f'Found {dst}; skipping'
            print(msg)
            return msg
    else:
        msg = f'Wrote {dst}'

    with TemporaryDirectory() as d:
        zip_path = f'{d}/{base}.zip'
        pqt_path = f'{d}/{base}.parquet'
        s3.download_file(src_bkt, zip_key, zip_path)
        z = ZipFile(zip_path)
        names = z.namelist()
        print(f'{name}: zip names: {names}')
        [ name ] = [ f for f in names if f.endswith('.csv') and not f.startswith('_') ]
        with z.open(name,'r') as i:
            df = pd.read_csv(i)
            df = normalize_fields(df)
            df = df.astype({'Start Time':'datetime64[ns]','Stop Time':'datetime64[ns]'})
            df.to_parquet(pqt_path)

        s3.upload_file(pqt_path, dst_bkt, dst_key)

        return msg

In [10]:
from joblib import delayed, Parallel
parallel = Parallel(n_jobs=cpu_count())

In [11]:
print('\n'.join(parallel(delayed(to_parquet)(f) for f in zips.Key.values)))

Found s3://ctbk/201306-citibike-tripdata.parquet; skipping
Unrecognized key: 201307-201402-citibike-tripdata.zip
Found s3://ctbk/201307-citibike-tripdata.parquet; skipping
Found s3://ctbk/201308-citibike-tripdata.parquet; skipping
Found s3://ctbk/201309-citibike-tripdata.parquet; skipping
Found s3://ctbk/201310-citibike-tripdata.parquet; skipping
Found s3://ctbk/201311-citibike-tripdata.parquet; skipping
Found s3://ctbk/201312-citibike-tripdata.parquet; skipping
Found s3://ctbk/201401-citibike-tripdata.parquet; skipping
Found s3://ctbk/201402-citibike-tripdata.parquet; skipping
Found s3://ctbk/201403-citibike-tripdata.parquet; skipping
Found s3://ctbk/201404-citibike-tripdata.parquet; skipping
Found s3://ctbk/201405-citibike-tripdata.parquet; skipping
Found s3://ctbk/201406-citibike-tripdata.parquet; skipping
Found s3://ctbk/201407-citibike-tripdata.parquet; skipping
Found s3://ctbk/201408-citibike-tripdata.parquet; skipping
Found s3://ctbk/201409-citibike-tripdata.parquet; skipping
Fo