New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data loss while writing avro file to s3 compatible storage #209
Comments
Thank you for providing example source. Could you please describe what the problem is in detail? An example of the data loss, as well as how you detected it, would be very helpful. |
I converted the avro file to Json and read the number of lines. I expect the number of lines in Json to match the number of lines in csv and it didn't match. |
Have you confirmed that this problem still persists if you use local storage instead of smart_open? |
Yes, this problem doesn't exist if I use local storage . However file on local storage was not written through smart_open |
From your example source, I can see that you're reading the CSV file with smart_open too. Is the CSV file being read correctly? In other words, does the problem still persist if the CSV file is loaded from local storage? |
Yes file is being read correctly. I counted number of lines in the dataframe i loaded . i also counted the datumWriter count. It matches but after writing to avro on s3 and reading it i see there is data loss. This confirms that problem is with writing binary data to s3 |
What version of smart_open are you using? Have you observed a similar problem with older versions? |
smart_open-1.3.5 with anaconda version of python 2.7 |
I haven't tried with other versions |
OK, thank you for providing detailed information about this bug. We will investigate. |
thanks |
Hi, do you have any update? |
Sorry, no, I have not looked into this yet. Is this urgent for you? |
Yes, this is a blocker for us . We are also searching some alternative to smart open. Haven't found any yet. At least a work around asap would be great |
I think the workaround is to write the avro file to local storage first, and upload to S3 when it is complete. I've started investigating the issue, but cannot reproduce the problem because your code sample is incomplete: the gen_schema function is missing. Could you please look at this file: https://github.com/mpenkov/smart_open/blob/209/integration-tests/test_209.py and update it so that it reproduces your problem? |
Thanks for investigating the issue. writing to local storage is not an option for us because of storage problem and latency it involves |
Where is the updated file? |
I have updated the code in test_209.py you provided. It showed pushed |
I think your push failed. Here’s the file in your repo: https://github.com/vinuthna91/smart_open/blob/209/integration-tests/test_209.py It’s unchanged from my version. If you cant work it out, please paste the code here as a comment. |
I changed it, I am sorry for delayed response. I was out of station. https://github.com/vinuthna91/smart_open/blob/209/integration-tests/test_209.py def gen_schema(paramNames):
paramNamesLen = len(paramNames)
dataName = 'schema'
avroSchemaOut = "{\n\t\"type\": \"record\", \"name\": \"%s\", \"namespace\": \"com.sandisk.bigdata\", \n \t\"fields\": [" %(dataName)
if paramNamesLen==0:
#no parameters, no schema file generation
avroSchemaOut = ''
else:
#generate file
for ii in range(paramNamesLen):
typeString = "[\"%s\", \"null\"]" %('String')
schemaString = "{ \"name\":\"%s\", \"type\":%s, \"default\":null}" % (paramNames[ii], typeString)
if ii == 0:
avroSchemaOut += schemaString + ',\n'
elif ii <len(paramNames)-1:
avroSchemaOut += "\t\t\t" + schemaString + ',\n'
else:
avroSchemaOut += "\t\t\t" + schemaString + '\n'
avroSchemaOut += "\n \t\t\t]\n}"
return avroSchemaOut |
I pulled your changes and made some updates. Unfortunately, the code crashes with an error unrelated to smart_open:
Please have a look and update the code. The updated code is here: https://github.com/mpenkov/smart_open/blob/209/integration-tests/test_209.py |
Thank you for the update. Unfortunately, I still cannot reproduce your problem because of errors in avro.
Can you please make sure the test runs and reproduces your actual problem? |
Can you give the csv you used? |
The CSV gets downloaded as part of the actual test. Please see the source
file.
…On Thu, Jul 26, 2018 at 20:46 vinuthna91 ***@***.***> wrote:
Can you give the csv you used?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#209 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABDOVNa4S6NuvZly7GhcHDkbWl-zmRhYks5uKawQgaJpZM4VPAWw>
.
|
Could you please check now. I updated the code. |
Were you able to try it? Please confirm that the code is working now |
Yes, the code is working now. I was able to use it to reproduce the problem. I also simplified it a bit to rule out errors in the actual test: https://github.com/mpenkov/smart_open/blob/209/integration-tests/test_209.py I'll take it from here. Thanks for your help in reproducing the problem. |
|
Without seeing your actual test case (source code or at least shell commands), it's hard for me to comment. I ran an additional test to ensure that there is no data loss when reading files from S3:
I downloaded the CSV file using 1) smart_open 2) AWS CLI and compared the two files. They were identical, so there is no data loss in the reading case, either. |
I have used same code provided by you except that i used my local data set. My csv file size is only 90KB On line number 72, I have added #writer.close(). When writer.close() is uncommented, writing local.avro to local storage is complete and there is no data loss. However, writing to s3 storage throws an error AttributeError: 'S3OpenWrite' object has no attribute 'flush'. When you comment this( writer.close() is not used), local.avro and remote.avro both are same but, there is data loss in both the files when compared to csv data. |
I don't know how |
Oh, the truncation is just because of the |
i don't see truncation in number of rows in pandas data frame. i see it only in the avro file. I am counting number of records written to local avro file when we use writer.close() and when we do not use. |
When I read in all the data using |
when writer.close() is called it internally calls a flush() method that Flushes the current state of the file, including metadata. The error i am getting is 'S3OpenWrite' object has no attribute 'flush'. because of which metadata is being lost. |
I don't see that exception from the test script, but yeah, that would make sense. |
@scottbelden that is exactly the data loss i am talking about. If you uncomment writer.close() in line#72, you should see the exception while writing to avro file on s3 |
I'm not sure which script has |
I made the change to test script. If you do not see the change, you need to add writer.close() to the write_avro function definition: def write_avro(foutd): |
OK, I think see what's going on here. I'll look at this again on the weekend. |
Thanks! |
could you find anything? |
It's still Thursday. Weekend starts in two days. |
Okay. |
OK, it's the weekend, and I've had a look at it. Time to clear things up. The "data loss" referred to in this ticket does not come from smart_open. It comes from misusing avro. You need to either:
Both work identically. If you don't call them, avro keeps some data in its internal buffers, and never writes it to the output file. However, if you do the above while using smart_open, avro ends up calling BufferedOutputWriter.close twice. The first time succeeds, but the second time fails due to a bug in that method. I've added a test and fixed that bug. Strangely, I haven't been able to reproduce the errors related to the absence of a flush method. @vinuthna91, can you reproduce the error with the new code? |
@mpenkov Thanks for looking into it. unfortunately, i used the same code and it still throws me error about absence of flush method.
Since, i get the error, I also added another method which doesnt have manual close. that how i am trying to write now- def write_avro_no_manual_close(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
writer = avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema)
for ll, row in enumerate(dictRes):
writer.append(row)
#writer.close()
def write_avro_context_manager(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
with avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema) as writer:
for ll, row in enumerate(dictRes):
writer.append(row)
@mock.patch('avro.datafile.DataFileWriter.generate_sync_marker', mock.Mock(return_value=b'0123456789abcdef'))
def write_avro_manual_close(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
writer = avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema)
for ll, row in enumerate(dictRes):
writer.append(row)
writer.close()
with open('local.avro', 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_context_manager(foutd)
with open('local-so.avro', 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_manual_close(foutd)
with open('local-nomanual.avro', 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_no_manual_close(foutd) I clearly see that local.avro, local-so.avro are identical but not identical to local-nomanual.avro. I am pasting the code i am using also for your reference- import os
import os.path as P
import avro.io
import avro.datafile
import pandas as pn
import smart_open
import six
import subprocess
import warnings
import boto
from boto.compat import urlsplit, six
import boto.s3.connection
import logging
import json
###### Added by me
access_key_id=access_key_id
secret_access_key =secret_access_key
port=port
hostname=hostname
##########
with warnings.catch_warnings():
warnings.simplefilter('ignore')
import pandas as pn
logging.basicConfig(level=logging.ERROR)
if six.PY3:
assert False, 'this code only runs on Py2.7'
_S3_URL = 's3://bucket-for-testing/user/vinuthna'
assert _S3_URL is not None, 'please set the SO_S3_URL environment variable'
'''
_NUMROWS = os.environ.get('SO_NUMROWS')
if _NUMROWS is not None:
_NUMROWS = int(_NUMROWS)
'''
def gen_schema(data):
schema = {
'type': 'record', 'name': 'data', 'namespace': 'namespace',
'fields': [
{'name': field, 'type': ['null', 'string'], 'default': None}
for field in data.columns
]
}
return json.dumps(schema, indent=4)
def write_avro_nomanualclose(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
writer_nomanual = avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema)
for ll, row in enumerate(dictRes):
writer_nomanual.append(row)
#writer_manual.close()
def write_avro_context_manager(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
with avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema) as writer_contextManager:
for ll, row in enumerate(dictRes):
writer_contextManager.append(row)
def write_avro_manual_close(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
writer_manual = avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema)
for ll, row in enumerate(dictRes):
writer_manual.append(row)
writer_manual.close()
inputFilePath = P.join(_S3_URL,'test.csv')
splitInputDir = urlsplit(inputFilePath, allow_fragments=False)
#logging.info(">>> establishing connection for inputDir")
#establish connection manuelly for customized options
inConn = boto.connect_s3(
aws_access_key_id = access_key_id,
aws_secret_access_key = secret_access_key,
port=int(port),
host = hostname,
is_secure=False,
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
)
#get bucket
inbucket = inConn.get_bucket(splitInputDir.netloc)
# read in the csv file
kr = inbucket.get_key(splitInputDir.path)
assert kr is not None, 'File not present'
with smart_open.smart_open(kr, 'r') as fin:
data = pn.read_csv(fin, header=1, error_bad_lines=False,dtype='str').fillna('NA')
num_csv_rows = len(data.index)
avroSchemaOut = gen_schema(data)
#No dataloss
with open('local.avro', 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_context_manager(foutd)
###No dataloss
with open('local-so.avro', 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_manual_close(foutd)
## Data loss observed
with open('local-nomanual.avro', 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_nomanualclose(foutd)
##Below works
outputfilepath = _S3_URL + '/nomanual.avro'
splitoutdir = urlsplit(outputfilepath, allow_fragments=False)
kw = inbucket.get_key(splitoutdir.path,validate=False)
with smart_open.smart_open(kw, 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_nomanualclose(foutd)
### This throws error
outputfilepath = _S3_URL + '/outcontext.avro'
splitoutdir = urlsplit(outputfilepath, allow_fragments=False)
kw = inbucket.get_key(splitoutdir.path,validate=False)
with smart_open.smart_open(kw, 'wb') as foutd:
logging.critical('writing to %r', foutd)
write_avro_context_manager(foutd) |
Also, in your test code, try reading csv file from s3 because my test case is reading a csv from s3 -> converting to avro -> writing back avro to s3 |
Whoa... Please format your code next time. Besides, most of that code is not relevant, because it demonstrates facts that we've already established (if you don't close the Avro writer, you'll lose some data).
Thank you for giving me details about your test case, I don't think this is relevant to the problem. What version of avro are you using? |
Sure, I will format it. I am using version 2.0 |
That doesn't sound right.
The newest version in pip is 1.8.2. The current version on apache.org is also 1.8.2. |
sorry it is 1.8.0. I misinterpreted it |
I tried to reproduce the problem with 1.8.0 but couldn't. In the immediate future, you could try mocking out the flush method: # completely untested code!
with smart_open.smart_open(s3_url, 'wb') as fout:
fout.flush = lambda: None
write_avro(data, fout) I'll look at adding flush support to the API. |
Thanks. It says object is read only Traceback (most recent call last): |
Sounds like you're overriding flush for the wrong object.
This means that flush already exists for that particular object. |
Got it. Its working now . Thanks a lot for the work around :) |
Closed via #212 |
Hi,
I am converting a csv file into avro and writing to s3 compliant storage.I see that schema file(.avsc) is written properly. However, there is data loss while writing to .avro file.
Below is snippet of my code
The text was updated successfully, but these errors were encountered: