# Data Types

In [1]:
#!pip install pandas
#!pip install pip-autoremove

# CSV

In [12]:
data_path = "/Users/resitkadir/Desktop/Python-mix/2-Data_Ingestion_Python/data/"

2.1

In [15]:
"""Load & convert data from CSV file using Python built-in csv module"""
import bz2
import csv
from collections import namedtuple
from datetime import datetime

Column = namedtuple('Column', 'src dest convert')


def parse_timestamp(text):
    return datetime.strptime(text, '%Y-%m-%d %H:%M:%S')


columns = [
    Column('VendorID', 'vendor_id', int),
    Column('passenger_count', 'num_passengers', int),
    Column('tip_amount', 'tip', float),
    Column('total_amount', 'price', float),
    Column('tpep_dropoff_datetime', 'dropoff_time', parse_timestamp),
    Column('tpep_pickup_datetime', 'pickup_time', parse_timestamp),
    Column('trip_distance', 'distance', float),
]


def iter_records(file_name):
    with bz2.open(file_name, 'rt') as fp:
        reader = csv.DictReader(fp)
        for csv_record in reader:
            record = {}
            for col in columns:
                value = csv_record[col.src]
                record[col.dest] = col.convert(value)
            yield record


def example():
    from pprint import pprint

    for i, record in enumerate(iter_records(data_path+'taxi.csv.bz2')):
        if i >= 10:
            break
        pprint(record)


example()

{'distance': 2.57,
 'dropoff_time': datetime.datetime(2018, 11, 1, 6, 43, 24),
 'num_passengers': 1,
 'pickup_time': datetime.datetime(2018, 10, 31, 7, 10, 55),
 'price': 20.54,
 'tip': 4.74,
 'vendor_id': 2}
{'distance': 3.58,
 'dropoff_time': datetime.datetime(2018, 10, 31, 16, 50, 10),
 'num_passengers': 5,
 'pickup_time': datetime.datetime(2018, 10, 31, 16, 38, 25),
 'price': 13.8,
 'tip': 0.0,
 'vendor_id': 2}
{'distance': 2.39,
 'dropoff_time': datetime.datetime(2018, 10, 31, 20, 31, 47),
 'num_passengers': 1,
 'pickup_time': datetime.datetime(2018, 10, 31, 20, 23, 41),
 'price': 11.3,
 'tip': 1.0,
 'vendor_id': 2}
{'distance': 0.5,
 'dropoff_time': datetime.datetime(2018, 10, 31, 22, 48, 28),
 'num_passengers': 1,
 'pickup_time': datetime.datetime(2018, 10, 31, 22, 44, 24),
 'price': 5.8,
 'tip': 0.0,
 'vendor_id': 2}
{'distance': 1.81,
 'dropoff_time': datetime.datetime(2018, 10, 31, 23, 35, 30),
 'num_passengers': 1,
 'pickup_time': datetime.datetime(2018, 10, 31, 23, 22, 18),

In [8]:
#2.2 #pandas convert all data type automatically into relevant onces, except timestaps#
#csv_pv csv_pandas

In [17]:
"""Load & converting data from CSV using Pandas"""
import pandas as pd

time_cols = ['tpep_dropoff_datetime', 'tpep_pickup_datetime']

def load_df(file_name):
    return pd.read_csv(data_path+'taxi.csv.bz2', parse_dates=time_cols)

print(load_df('taxi.csv.bz2').head(2))


   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2018-10-31 07:10:55   2018-11-01 06:43:24                1   
1         2  2018-10-31 16:38:25   2018-10-31 16:50:10                5   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           2.57           1                  N           211            48   
1           3.58           1                  N           237           144   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \
0             1         14.5    0.5      0.5        4.74           0.0   
1             2         12.5    0.5      0.5        0.00           0.0   

   improvement_surcharge  total_amount  
0                    0.3         20.54  
1                    0.3         13.80  


# XML

In [19]:
"""Load rides data from XML"""

import bz2
import xml.etree.ElementTree as xml

import pandas as pd

# Data conversions
conversion = [
    ('vendor', int),
    ('people', int),
    ('tip', float),
    ('price', float),
    ('pickup', pd.to_datetime),
    ('dropoff', pd.to_datetime),
    ('distance', float),
]


def iter_rides(file_name):
    with bz2.open(file_name, 'rt') as fp:
        tree = xml.parse(fp)

    rides = tree.getroot()
    for elem in rides:
        record = {}
        for tag, func in conversion:
            text = elem.find(tag).text
            record[tag] = func(text)
        yield record


def load_xml(file_name):
    records = iter_rides(file_name)
    return pd.DataFrame.from_records(records)


# Example
if __name__ == '__main__':
    df = load_xml(data_path+'taxi.xml.bz2')
    print(df.dtypes)
    print(df.head())

vendor               int64
people               int64
tip                float64
price              float64
pickup      datetime64[ns]
dropoff     datetime64[ns]
distance           float64
dtype: object
   vendor  people   tip  price              pickup             dropoff  \
0       2       1  4.74  20.54 2018-10-31 07:10:55 2018-11-01 06:43:24   
1       2       5  0.00  13.80 2018-10-31 16:38:25 2018-10-31 16:50:10   
2       2       1  1.00  11.30 2018-10-31 20:23:41 2018-10-31 20:31:47   
3       2       1  0.00   5.80 2018-10-31 22:44:24 2018-10-31 22:48:28   
4       2       1  2.26  13.56 2018-10-31 23:22:18 2018-10-31 23:35:30   

   distance  
0      2.57  
1      3.58  
2      2.39  
3      0.50  
4      1.81  


# Parquet to CsV

In [23]:
import pyarrow.parquet as pq

table = pq.read_table(data_path+"taxi.parquet")

#apache arrow
df = table.to_pandas()

type(df)

pandas.core.frame.DataFrame

# Parse Logs

In [25]:
#https://pythex.org/ #--regex

In [38]:
"""Convert unstructured ride text to JSON"""
import bz2
import logging
import re
from datetime import datetime


def parse_line(line):
    # Example:
    # Ride of 1 passenger started at 2018-10-31T07:10:55 and paid $20.54
    match = re.search(
        r'(\d+) pass.*started at ([^ ]+).*paid \$(\d+\.\d+)',
        line)
    if not match:
        return None

    return {
        'count': int(match.group(1)),
        'start': datetime.fromisoformat(match.group(2)),
        'amount': float(match.group(3)),
    }


def iter_rides(file_name):
    with bz2.open(file_name, 'rt') as fp:
        for lnum, line in enumerate(fp, 1):
            record = parse_line(line)
            if not record:
                logging.warning('%s: cannot parse line', lnum)
                continue
            yield record


# Example
if __name__ == '__main__':
    from pprint import pprint

    for n, ride in enumerate(iter_rides(data_path+'taxi.log.bz2')):
        #print(ride )
        if n > 5:
        
            break
        pprint(ride)



#pythox--> regex playground

{'amount': 20.54,
 'count': 1,
 'start': datetime.datetime(2018, 10, 31, 7, 10, 55)}
{'amount': 13.8,
 'count': 5,
 'start': datetime.datetime(2018, 10, 31, 16, 38, 25)}
{'amount': 11.3,
 'count': 1,
 'start': datetime.datetime(2018, 10, 31, 20, 23, 41)}
{'amount': 5.8,
 'count': 1,
 'start': datetime.datetime(2018, 10, 31, 22, 44, 24)}
{'amount': 13.56,
 'count': 1,
 'start': datetime.datetime(2018, 10, 31, 23, 22, 18)}
{'amount': 24.3,
 'count': 1,
 'start': datetime.datetime(2018, 10, 31, 23, 27, 39)}


## Julia-JSON

In [48]:
"""Calculate average ride duration, from file with JSON object per line"""

import json
from datetime import datetime, timedelta


def parse_time(ts):
    """
    >>> parse_time('2018-10-31T07:10:55.000Z')
    datetime.datetime(2018, 10, 31, 7, 10, 55)
    """
    # [:-1] trims Z suffix
    return datetime.fromisoformat(ts[:-1])


def fix_pair(pair):
    key, value = pair
    if key not in ('pickup', 'dropoff'):
        return pair
    return key, parse_time(value)


def pairs_hook(pairs):
    return dict(fix_pair(pair) for pair in pairs)


durations = []
with open(data_path+'taxi.jl') as fp:
    for line in fp:
        obj = json.loads(line, object_pairs_hook=pairs_hook)
        duration = obj['dropoff'] - obj['pickup']
        durations.append(duration)
    print(line)



{"vendor":1,"pickup":"2018-11-01T11:04:08.000Z","dropoff":"2018-11-01T11:10:34.000Z","distance":1.2,"tip":0.0,"total":7.8}



In [44]:
avg_duration = sum(durations, timedelta()) / len(durations)
print(f'average ride duration: {avg_duration}')

average ride duration: 0:17:28.555600


# CSVtoJSON

In [52]:
"""Convert CSV to JSON (one object per line)"""
import bz2
import csv
import json
from collections import namedtuple
from datetime import datetime

Column = namedtuple('Column', 'src dest convert')


def parse_timestamp(text):
    return datetime.strptime(text, '%Y-%m-%d %H:%M:%S')


columns = [
    Column('VendorID', 'vendor_id', int),
    Column('passenger_count', 'num_passengers', int),
    Column('tip_amount', 'tip', float),
    Column('total_amount', 'price', float),
    Column('tpep_dropoff_datetime', 'dropoff_time', parse_timestamp),
    Column('tpep_pickup_datetime', 'pickup_time', parse_timestamp),
    Column('trip_distance', 'distance', float),
]


def iter_records(file_name):
    with bz2.open(file_name, 'rt') as fp:
        reader = csv.DictReader(fp)
        for csv_record in reader:
            record = {}
            for col in columns:
                value = csv_record[col.src]
                record[col.dest] = col.convert(value)
            yield record


def encode_time(obj):
    if not isinstance(obj, datetime):
        return obj
    return obj.isoformat()


with open(data_path+'taxi.jl', 'w') as out:
    for record in iter_records(data_path+'taxi.csv.bz2'):
        data = json.dumps(record, default=encode_time)
        out.write(f'{data}\n')
