In [7]:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import json
import pandas as pd
from datetime import datetime
from avro_json_serializer import AvroJsonSerializer

def to_unix_time(dt):
    dt_obj = datetime.strptime(dt,'%Y-%m-%d %H:%M:%S')
    epoch = datetime(1970,1,1)
    result = dt_obj - epoch
    return result.total_seconds()

def to_boolean(string):
    if string == 'Y':
        return True
    elif string == 'N':
        return False


schema_dict = {"namespace": "example.avro",
 "type": "record",
 "name": "yellow_tripdata_2015-01",
 "fields": [
     {"name": "VendorID", "type": "int"},
     {"name": "tpep_pickup_datetime",  "type": "float"},
     {"name": "tpep_dropoff_datetime", "type": "float"},
     {"name": "passenger_count", "type": "int"},
     {"name": "trip_distance", "type": "float"},
     {"name": "pickup_longitude", "type": "double"},
     {"name": "pickup_latitude", "type": "double"},
     {"name": "RateCodeID", "type": "int"},
     {"name": "store_and_fwd_flag", "type": "boolean"},
     {"name": "dropoff_longitude", "type": "float"},
     {"name": "dropoff_latitude", "type": "float"},
     {"name": "payment_type", "type": "int"},
     {"name": "fare_amount", "type": "float"},
     {"name": "extra", "type": "float"},
     {"name": "mta_tax", "type": "float"},
     {"name": "tip_amount", "type": "float"},
     {"name": "tolls_amount", "type": "float"},
     {"name": "improvement_surcharge", "type": "float"},
     {"name": "total_amount", "type": "float"}
 ]
}

avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())
serializer = AvroJsonSerializer(avro_schema)
    
df = pd.read_csv('csv/head100_yellow_tripdata_2015-01.csv',
                 encoding='utf-8', engine='c')

df['tpep_pickup_datetime'] = df['tpep_pickup_datetime'].apply(to_unix_time)
df['tpep_dropoff_datetime'] = df['tpep_dropoff_datetime'].apply(to_unix_time)
df['store_and_fwd_flag'] = df['store_and_fwd_flag'].apply(to_boolean)

data = df.drop_duplicates().to_dict('record')[:10]

#schema = avro.schema.parse(open("yellow_tripdata_2015-01.avsc").read())
writer = DataFileWriter(open('head100_yellow_tripdata_2015-01.avro', 'wb'),
                    DatumWriter(),
                    avro_schema)
        
for r in data:
    json_str = serializer.to_json(r)
    writer.append(json_str)
writer.close()

AvroTypeException: The datum {"VendorID":2,"tpep_pickup_datetime":1421348739.0,"tpep_dropoff_datetime":1421349822.0,"passenger_count":1,"trip_distance":1.59,"pickup_longitude":-73.993896484375,"pickup_latitude":40.7501106262207,"RateCodeID":1,"store_and_fwd_flag":false,"dropoff_longitude":-73.97478485107422,"dropoff_latitude":40.75061798095703,"payment_type":1,"fare_amount":12.0,"extra":1.0,"mta_tax":0.5,"tip_amount":3.25,"tolls_amount":0.0,"improvement_surcharge":0.3,"total_amount":17.05} is not an example of the schema {
  "namespace": "example.avro", 
  "type": "record", 
  "name": "yellow_tripdata_2015-01", 
  "fields": [
    {
      "type": "int", 
      "name": "VendorID"
    }, 
    {
      "type": "float", 
      "name": "tpep_pickup_datetime"
    }, 
    {
      "type": "float", 
      "name": "tpep_dropoff_datetime"
    }, 
    {
      "type": "int", 
      "name": "passenger_count"
    }, 
    {
      "type": "float", 
      "name": "trip_distance"
    }, 
    {
      "type": "double", 
      "name": "pickup_longitude"
    }, 
    {
      "type": "double", 
      "name": "pickup_latitude"
    }, 
    {
      "type": "int", 
      "name": "RateCodeID"
    }, 
    {
      "type": "boolean", 
      "name": "store_and_fwd_flag"
    }, 
    {
      "type": "float", 
      "name": "dropoff_longitude"
    }, 
    {
      "type": "float", 
      "name": "dropoff_latitude"
    }, 
    {
      "type": "int", 
      "name": "payment_type"
    }, 
    {
      "type": "float", 
      "name": "fare_amount"
    }, 
    {
      "type": "float", 
      "name": "extra"
    }, 
    {
      "type": "float", 
      "name": "mta_tax"
    }, 
    {
      "type": "float", 
      "name": "tip_amount"
    }, 
    {
      "type": "float", 
      "name": "tolls_amount"
    }, 
    {
      "type": "float", 
      "name": "improvement_surcharge"
    }, 
    {
      "type": "float", 
      "name": "total_amount"
    }
  ]
}

In [8]:
for r in data:
    try:
        d = json.dumps(record,ensure_ascii=False)
        writer.append(d)
    except:
        for x,y in r.items():
            print(x,type(y), y)
        break

(u'trip_distance', <type 'float'>, 1.59)
(u'VendorID', <type 'int'>, 2)
(u'improvement_surcharge', <type 'float'>, 0.3)
(u'tip_amount', <type 'float'>, 3.25)
(u'total_amount', <type 'float'>, 17.05)
(u'tolls_amount', <type 'float'>, 0.0)
(u'tpep_dropoff_datetime', <type 'float'>, 1421349822.0)
(u'RateCodeID', <type 'int'>, 1)
(u'extra', <type 'float'>, 1.0)
(u'fare_amount', <type 'float'>, 12.0)
(u'pickup_longitude', <type 'float'>, -73.993896484375)
(u'dropoff_longitude', <type 'float'>, -73.97478485107422)
(u'passenger_count', <type 'int'>, 1)
(u'payment_type', <type 'int'>, 1)
(u'mta_tax', <type 'float'>, 0.5)
(u'store_and_fwd_flag', <type 'bool'>, False)
(u'pickup_latitude', <type 'float'>, 40.7501106262207)
(u'dropoff_latitude', <type 'float'>, 40.75061798095703)
(u'tpep_pickup_datetime', <type 'float'>, 1421348739.0)


In [91]:
import pandas as pd
from datetime import datetime

def to_unix_time(dt):
    dt_obj = datetime.strptime(dt,'%Y-%m-%d %H:%M:%S')
    epoch = datetime(1970,1,1)
    result = dt_obj - epoch
    return result.total_seconds()

def to_boolean(string):
    if string == 'Y':
        return True
    elif string == 'N':
        return False

df = pd.read_csv('csv/head100_yellow_tripdata_2015-01.csv',
                 encoding='utf-8', engine='c')

df['tpep_pickup_datetime'] = df['tpep_pickup_datetime'].apply(to_unix_time)
df['tpep_dropoff_datetime'] = df['tpep_dropoff_datetime'].apply(to_unix_time)
df['store_and_fwd_flag'] = df['store_and_fwd_flag'].apply(to_boolean)

df.isnull().any()

VendorID                 False
tpep_pickup_datetime     False
tpep_dropoff_datetime    False
passenger_count          False
trip_distance            False
pickup_longitude         False
pickup_latitude          False
RateCodeID               False
store_and_fwd_flag       False
dropoff_longitude        False
dropoff_latitude         False
payment_type             False
fare_amount              False
extra                    False
mta_tax                  False
tip_amount               False
tolls_amount             False
improvement_surcharge    False
total_amount             False
dtype: bool

In [66]:
df.dtypes

VendorID                   int64
tpep_pickup_datetime     float64
tpep_dropoff_datetime    float64
passenger_count            int64
trip_distance            float64
pickup_longitude         float64
pickup_latitude          float64
RateCodeID                 int64
store_and_fwd_flag          bool
dropoff_longitude        float64
dropoff_latitude         float64
payment_type               int64
fare_amount              float64
extra                    float64
mta_tax                  float64
tip_amount               float64
tolls_amount             float64
improvement_surcharge    float64
total_amount             float64
dtype: object

In [None]:
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
    print user
reader.close()