Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Diagnose, benchmark and provide guidance for loading large dataframe from BigQuery #329

Open
Di-Ku opened this issue Mar 29, 2017 · 13 comments
Assignees
Labels

Comments

@Di-Ku
Copy link
Contributor

Di-Ku commented Mar 29, 2017

Customer query (via Tahir F.)

Do we have any kind of benchmarks / recommendations for the GCE set-up for the amount of data that would be brought into a pandas dataframe?
His question is as follows:
From my perspective, could you advise me the appropriate spec of GCE?
We grade up the GCE spec and it seems to use only 2% of CPU but it takes 5mins to handle 500,000rows data in pandas.

Do you have any idea to improve the performance of datalab?
Does it relate to network or disk issue?

@parthea
Copy link
Contributor

parthea commented Apr 3, 2017

I've also noticed that loading a large DataFrame from BigQuery is slow. My machine is outside of GCE so I understand that there will be significant delays in downloading data. Still, my feeling (but still need to confirm) is that the following process is quicker compared to bq.Query(<sqlModule>).to_dataframe(dialect='standard') .

  • Run bq.Query(<sqlModule>).extract(<gcs object path>)
  • Then download the file using storage.Bucket(<my bucket>).item(<item>).read_from()
  • Followed by pd.read_csv(<path to csv file>)

Some initial profiling data:

Using bq.Query().extract(): Total Time => ~2 minutes , 39 seconds

bq.Query(<sqlModule>).extract(<gcs path>, dialect='standard', use_cache=False)
         167823 function calls (167674 primitive calls) in 158.961 seconds

   Ordered by: internal time
   List reduced from 431 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       17   85.070    5.004   85.070    5.004 {time.sleep}
    15523   65.955    0.004   65.955    0.004 {method 'read' of '_ssl._SSLSocket' objects}
       23    5.371    0.234    5.371    0.234 {_socket.getaddrinfo}
       23    1.411    0.061    1.411    0.061 {method 'do_handshake' of '_ssl._SSLSocket' objects}
       23    0.711    0.031    0.711    0.031 {method 'connect' of '_socket.socket' objects}
        2    0.263    0.132    0.263    0.132 {method 'read' of 'file' objects}
       23    0.064    0.003    0.064    0.003 {method 'load_verify_locations' of '_ssl._SSLContext' objects}
      704    0.018    0.000   66.000    0.094 socket.py:406(readline)
    15523    0.009    0.000   65.976    0.004 ssl.py:707(recv)
    15523    0.009    0.000   65.965    0.004 ssl.py:597(read)

Using storage.Bucket().item().read_from(): Total Time => ~14 seconds

  import datalab.storage as storage
  data = storage.Bucket(<my bucket>).item(<item>).read_from()
        31105 function calls (31093 primitive calls) in 14.182 seconds

   Ordered by: internal time
   List reduced from 284 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     3929    8.292    0.002    8.292    0.002 {method 'read' of '_ssl._SSLSocket' objects}
        1    5.042    5.042    5.042    5.042 {_socket.getaddrinfo}
        1    0.725    0.725    0.725    0.725 {method 'do_handshake' of '_ssl._SSLSocket' objects}
        1    0.029    0.029    0.029    0.029 {method 'connect' of '_socket.socket' objects}
     3960    0.019    0.000    0.019    0.000 {method 'write' of 'cStringIO.StringO' objects}
       43    0.017    0.000    7.696    0.179 socket.py:336(read)
       37    0.015    0.000    0.015    0.000 {method 'join' of 'str' objects}
       74    0.013    0.000    0.013    0.000 {method 'getvalue' of 'cStringIO.StringO' objects}
     3929    0.007    0.000    8.299    0.002 ssl.py:597(read)
     3929    0.007    0.000    8.307    0.002 ssl.py:707(recv)

Using bq.Query().to_dataframe(): Total Time => ~13 minutes

my_df = bq.Query(<sql module>).to_dataframe(dialect='standard')
         18047922 function calls (18020910 primitive calls) in 806.355 seconds

   Ordered by: internal time
   List reduced from 677 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   638122  337.576    0.001  337.576    0.001 {method 'read' of '_ssl._SSLSocket' objects}
      737  239.891    0.325  239.902    0.326 {_socket.getaddrinfo}
      737  149.742    0.203  149.763    0.203 {method 'connect' of '_socket.socket' objects}
      737   53.041    0.072   53.041    0.072 {method 'do_handshake' of '_ssl._SSLSocket' objects}
      740    5.044    0.007    5.044    0.007 decoder.py:372(raw_decode)
   749089    3.018    0.000    5.228    0.000 _parser.py:30(parse_row)
     3662    2.258    0.001    2.258    0.001 {numpy.core.multiarray.concatenate}
      737    2.170    0.003    2.170    0.003 {method 'load_verify_locations' of '_ssl._SSLContext' objects}
  2996356    1.783    0.000    2.210    0.000 _parser.py:40(parse_value)
     1280    0.954    0.001    1.923    0.002 internals.py:4526(_merge_blocks)


Time difference => more than 10 minutes!

I'm using the legacy datalab package. I'll post an update after I migrate to the google.datalab package with more profiling snippets.

I used the following code for profiling:

stats = %prun -q -r testFunction()
import sys
stats.stream = sys.stdout 
stats.sort_stats('time').print_stats(10)

@yebrahim
Copy link
Contributor

yebrahim commented Apr 3, 2017

Thanks for the investigation @parthea, these numbers are helpful.

Can you add more information on your experiments? Particularly:

  • What size of data did you use?
  • Any public tables in your query?
  • Maybe you could share the query itself
  • Which region/zone was the GCS bucket you used to export the data?
  • You should make sure use_cache is set to False when executing the query to get reliable results, otherwise multiple runs might give inconsistent times on the BigQuery service side.

I think using the new interface is also important. Thanks for the investigation.

@parthea
Copy link
Contributor

parthea commented Apr 4, 2017

What size of data did you use?

I've changed my query to use a public dataset. I'm testing with 100,000 rows. After running the query, I see 'Bytes Processed 103 MB' in the BigQuery UI.

Any public tables in your query?
Maybe you could share the query itself

%%bq query -n get_github_mit_licences
SELECT * FROM `bigquery-public-data.github_repos.licenses`
WHERE license = 'mit'
LIMIT 100000

Which region/zone was the GCS bucket you used to export the data?

Multi-Regional/US

You should make sure use_cache is set to False when executing the query to get reliable results, otherwise multiple runs might give inconsistent times on the BigQuery service side.

Done

Here are my results using google.datalab:

Initially I used %prun to capture profiling data but I found that I had to use yappi to get additional multi-threading profile information. %prun reported {method 'acquire' of 'thread.lock' objects} whereas yappi reported more detail.

Here are the results from %prun. I modified dataframe in _query_output.py to accept a page_size parameter so I could save time when switching between page sizes as opposed to rebuilding the Datalab image.

Page size is 1024
          23332 function calls (23307 primitive calls) in 64.413 seconds

   Ordered by: internal time
   List reduced from 421 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       14   59.898    4.278   59.898    4.278 {method 'acquire' of 'thread.lock' objects}
     2234    3.425    0.002    3.425    0.002 {method 'read' of '_ssl._SSLSocket' objects}
        2    0.393    0.197    0.393    0.197 {method 'read' of 'file' objects}
        2    0.326    0.163    0.326    0.163 {posix.waitpid}
        3    0.100    0.033    0.100    0.033 {method 'do_handshake' of '_ssl._SSLSocket' objects}
        3    0.083    0.028    0.083    0.028 {method 'connect' of '_socket.socket' objects}
        3    0.044    0.015    0.045    0.015 {_socket.getaddrinfo}
        2    0.032    0.016    0.032    0.016 {posix.read}
     2234    0.019    0.000    3.459    0.002 ssl.py:707(recv)
       92    0.014    0.000    3.482    0.038 socket.py:406(readline)


Page size is 100000
          27325 function calls (27300 primitive calls) in 21.788 seconds

   Ordered by: internal time
   List reduced from 423 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       14   15.095    1.078   15.095    1.078 {method 'acquire' of 'thread.lock' objects}
     2627    5.611    0.002    5.611    0.002 {method 'read' of '_ssl._SSLSocket' objects}
        2    0.351    0.175    0.351    0.175 {method 'read' of 'file' objects}
        2    0.330    0.165    0.330    0.165 {posix.waitpid}
        3    0.129    0.043    0.129    0.043 {method 'do_handshake' of '_ssl._SSLSocket' objects}
        3    0.080    0.027    0.081    0.027 {method 'connect' of '_socket.socket' objects}
        3    0.047    0.016    0.047    0.016 {_socket.getaddrinfo}
      124    0.019    0.000    5.677    0.046 socket.py:406(readline)
        2    0.016    0.008    0.016    0.008 {posix.read}
     2627    0.013    0.000    5.627    0.002 ssl.py:597(read)

I used the following code:

# gcc needed to install yappi/ Graphviz needed to visualize output from yappi
!apt-get -qq update && apt-get -qq -y install gcc Graphviz
# yappi used to profile multi-threaded application. gprof2dot needed to visualize output from yappi
!pip install yappi gprof2dot
%%bq query -n get_github_mit_licences
SELECT * FROM `bigquery-public-data.github_repos.licenses`
WHERE license = 'mit'
LIMIT 100000
import yappi
import google.datalab.bigquery as bq
import google.datalab.storage as storage
import sys



def testFunction1(page_size):
  df =get_github_mit_licences.execute(output_options=bq.QueryOutput.dataframe(use_cache=False, page_size=page_size)).result()
def testFunction2():
  get_github_mit_licences.execute(output_options=bq.QueryOutput.file(path='gs://partheniou/test1', use_cache=False)).result()
  test = storage.Bucket('partheniou').object('test1').download()
  
print('Page size is 1024')
yappi.clear_stats()
yappi.start()
stats = %prun -q -r testFunction1(page_size=1024)
stats.stream = sys.stdout 
stats.sort_stats('time').print_stats(10)
yappi.stop()
yappi.get_func_stats().save('page_size_1024.callgrind', type='callgrind')
!gprof2dot --format=callgrind --output=page_size_1024.dot /content/page_size_1024.callgrind
!dot -Tpng page_size_1024.dot -o page_size_1024.png
yappi.clear_stats()

print('Page size is 100000')
yappi.start()
stats = %prun -q -r testFunction1(page_size=100000)
stats.stream = sys.stdout 
stats.sort_stats('time').print_stats(10)
yappi.stop()
yappi.get_func_stats().save('page_size_100000.callgrind', type='callgrind')
!gprof2dot --format=callgrind --output=page_size_100000.dot /content/page_size_100000.callgrind
!dot -Tpng page_size_100000.dot -o page_size_100000.png
yappi.clear_stats()

Output from yappi (converted to png):
For page_size=100000:
page_size_100000
For page_size=1024:
page_size_1024

It's clear that with this data size it would be better to use a page size of 100000. A potential next step could be to modify the SQL query to return more rows (i.e. LIMIT 100000).

@parthea
Copy link
Contributor

parthea commented Apr 4, 2017

I reverted my local changes because I didn't cater for _DEFAULT_PAGE_SIZE in __getitem__ at
https://github.com/googledatalab/pydatalab/blob/master/google/datalab/bigquery/_table.py#L744

I repeated the tests with the following code and the latest pydatalab at master using my local setup (not in GCE). I also removed the limit clause in my SQL query to test a larger size (1667267 rows).

!apt-get -qq update && apt-get -qq -y install gcc Graphviz
!pip install yappi gprof2dot
%%bq query -n get_github_mit_licences
SELECT * FROM `bigquery-public-data.github_repos.licenses`
WHERE license = 'mit'
import yappi
import google.datalab.bigquery as bq
import google.datalab.storage as storage
import sys



def testFunction1():
  get_github_mit_licences.execute(output_options=bq.QueryOutput.dataframe(use_cache=False)).result()
def testFunction2():
  get_github_mit_licences.execute(output_options=bq.QueryOutput.file(path='gs://partheniou/test1', use_cache=False)).result()
  test = storage.Bucket('partheniou').object('test1').download()
print('Page size is 1024')
yappi.clear_stats()
yappi.start()
stats = %prun -q -r testFunction1()
stats.stream = sys.stdout 
stats.sort_stats('time').print_stats(10)
yappi.stop()
yappi.get_func_stats().save('page_size_1024.callgrind', type='callgrind')
!gprof2dot --format=callgrind --output=page_size_1024.dot /content/page_size_1024.callgrind
!dot -Tpng page_size_1024.dot -o page_size_1024.png
yappi.clear_stats()

Output

Page size is 1024
          25410 function calls (25371 primitive calls) in 1058.539 seconds

   Ordered by: internal time
   List reduced from 465 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       14 1050.237   75.017 1050.237   75.017 {method 'acquire' of 'thread.lock' objects}
     2396    7.365    0.003    7.365    0.003 {method 'read' of '_ssl._SSLSocket' objects}
        2    0.407    0.204    0.407    0.204 {method 'read' of 'file' objects}
        3    0.130    0.043    0.130    0.043 {_socket.getaddrinfo}
        3    0.128    0.043    0.128    0.043 {method 'do_handshake' of '_ssl._SSLSocket' objects}
        3    0.118    0.039    0.118    0.039 {method 'connect' of '_socket.socket' objects}
        1    0.044    0.044 1058.539 1058.539 <ipython-input-4-9915fbe25556>:8(testFunction1)
        1    0.016    0.016    0.016    0.016 {posix.read}
      105    0.014    0.000    7.412    0.071 socket.py:406(readline)
     2396    0.010    0.000    7.390    0.003 ssl.py:707(recv)

page_size_1024

print('Copy to GCS')
yappi.clear_stats()
yappi.start()
stats = %prun -q -r testFunction2()
stats.stream = sys.stdout 
stats.sort_stats('time').print_stats(10)
yappi.stop()
yappi.get_func_stats().save('copy_to_gcs.callgrind', type='callgrind')
!gprof2dot --format=callgrind --output=copy_to_gcs.dot /content/copy_to_gcs.callgrind
!dot -Tpng copy_to_gcs.dot -o copy_to_gcs.png
yappi.clear_stats()

Output:

Copy to GCS
          66346 function calls (66295 primitive calls) in 132.214 seconds

   Ordered by: internal time
   List reduced from 480 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       14  122.042    8.717  122.042    8.717 {method 'acquire' of 'thread.lock' objects}
     7545    7.927    0.001    7.927    0.001 {method 'read' of '_ssl._SSLSocket' objects}
        4    0.783    0.196    0.783    0.196 {method 'read' of 'file' objects}
        3    0.446    0.149    0.446    0.149 {posix.waitpid}
        4    0.274    0.068    0.274    0.068 {_socket.getaddrinfo}
        4    0.169    0.042    0.169    0.042 {method 'connect' of '_socket.socket' objects}
        4    0.144    0.036    0.144    0.036 {method 'do_handshake' of '_ssl._SSLSocket' objects}
      179    0.067    0.000    5.597    0.031 socket.py:336(read)
     7545    0.053    0.000    7.997    0.001 ssl.py:597(read)
     7545    0.051    0.000    8.061    0.001 ssl.py:707(recv)

copy_to_gcs

Now using the latest master with the changes from PR #220

print('Page size is 100000')
yappi.start()
stats = %prun -q -r testFunction1()
stats.stream = sys.stdout 
stats.sort_stats('time').print_stats(10)
yappi.stop()
yappi.get_func_stats().save('page_size_100000.callgrind', type='callgrind')
!gprof2dot --format=callgrind --output=page_size_100000.dot /content/page_size_100000.callgrind
!dot -Tpng page_size_100000.dot -o page_size_100000.png
yappi.clear_stats()

Output:

Page size is 100000
          29438 function calls (29399 primitive calls) in 226.247 seconds

   Ordered by: internal time
   List reduced from 465 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       14  219.862   15.704  219.862   15.704 {method 'acquire' of 'thread.lock' objects}
     2789    5.244    0.002    5.244    0.002 {method 'read' of '_ssl._SSLSocket' objects}
        2    0.416    0.208    0.416    0.208 {method 'read' of 'file' objects}
        3    0.202    0.067    0.202    0.067 {method 'do_handshake' of '_ssl._SSLSocket' objects}
        3    0.182    0.061    0.182    0.061 {method 'connect' of '_socket.socket' objects}
        3    0.135    0.045    0.135    0.045 {_socket.getaddrinfo}
        1    0.082    0.082  226.247  226.247 <ipython-input-4-1f9731a7a829>:8(testFunction1)
      139    0.019    0.000    5.305    0.038 socket.py:406(readline)
     2789    0.014    0.000    5.276    0.002 ssl.py:707(recv)
        3    0.013    0.004    0.013    0.004 {method 'load_verify_locations' of '_ssl._SSLContext' objects}

page_size_100000

There is a significant performance improvement if we increase the page size to 100000. Note that the process of exporting data from BigQuery to GCS followed by download still seems to be faster on my local setup outside of GCE.

@yebrahim
Copy link
Contributor

yebrahim commented Apr 4, 2017

Thanks for the analysis, here are my thoughts:

  • I'm not sure using yappi to analyze parallel execution time adds much here, since our code is sequential. It sends http requests in a loop and adds the results to the dataframe. There might be a point to be made here about making these requests in parallel, but I'm almost sure this isn't going to get us much, due to the next point.
  • Using the table data list API is always going to be slower than having the service export the data to GCS and downloading it. We need to think about how we can do this in a transparent way, since we need an intermediate GCS bucket.
  • In order to have a fair comparison, one step is missing in the analysis, which is loading the data from the downloaded blob into a dataframe, which the to_dataframe method takes care of as well.

@yebrahim
Copy link
Contributor

yebrahim commented Apr 4, 2017

One more thing, it's probably better to just analyze the runtime for the Table.to_dataframe instead of Query(bq.QueryOutput.dataframe()).result(), because we're not interested in the query execution time. The table export to dataframe time is where all the Datalab logic is.

@yebrahim
Copy link
Contributor

yebrahim commented Apr 4, 2017

Here's a bit of my analysis to complete the picture. The numbers were all run on a GCE n1-standard-1 (1 vCPU, 3.75 GB memory) VM.

GCS Method (base line):
Table: bigquery-public-data:noaa_gsod.gsod1944
Step 1: Extraction, using bq extract CLI tool

Compression/Format	CSV	JSON	AVRO
Compressed		86s	86s	N/A
Uncompressed		97.75	103.5	84.5

Step 2: Downloading, using gsutil CLI tool

Uncompressed		1.92	2.26	1.82

Step 3: Import to dataframe, using pandas.from_[csv|json]

Uncompressed		<1	14.43	N/A

Total:

Uncompressed		100.57	120.16	N/A

Datalab Method:
Page size: 1024

Table		# of Rows	Pages	Avg page fetch	Wall Time	Diff
gsod1929	2,081.00	3	0.64		2.25		0.33
gsod1930	7,285.00	8	0.61		5.47		0.59
gsod1933	18,248.00	18	0.58		11.22		0.78
gsod1936	51,360.00	51	0.64		33.58		0.94
gsod1939	65,650.00	65	0.65		43.43		1.18
gsod1940	90,956.00	89	0.72		65.44		1.36
gsod1941	111,750.00	110	0.8		89.47		1.47
gsod1943	224,853.00	220	1.16		258.42		3.22
gsod1944	246,069.00	241	1.22		297.44		3.42

Page size: 100,000

Table		# of Rows	Pages	Avg page fetch	Wall Time	Diff
gsod1929	2,081.00	1	1.41		1.76		0.35
gsod1930	7,285.00	1	3.35		3.95		0.6
gsod1933	18,248.00	2	3.84		8.34		0.66
gsod1936	51,360.00	5	4.61		23.87		0.82
gsod1939	65,650.00	7	4.43		31.93		0.92
gsod1940	90,956.00	9	4.67		43.57		1.54
gsod1941	111,750.00	11	4.67		52.59		1.22
gsod1943	224,853.00	21	5.34		113.81		1.67
gsod1944	246,069.00	23	5.14		120.38		2.16

image

@yebrahim
Copy link
Contributor

yebrahim commented Apr 4, 2017

It's worth noting the average page fetch increases as the number of pages increases, which is interesting. Maybe we need to dig deeper into this.

image

Other than that, the numbers seem reasonable, and on par with the discussion above. The low hanging fruit here is to increase the page size, that should give us an easy 2x speedup. This pending PR and discussion is relevant here.

For longer term, we should look into avoiding the table list data API altogether.

@parthea
Copy link
Contributor

parthea commented Apr 4, 2017

I'm not sure using yappi to analyze parallel execution time adds much here, since our code is sequential. It sends http requests in a loop and adds the results to the dataframe. There might be a point to be made here about making these requests in parallel, but I'm almost sure this isn't going to get us much, due to the next point.
Using the table data list API is always going to be slower than having the service export the data to GCS and downloading it. We need to think about how we can do this in a transparent way, since we need an intermediate GCS bucket.

Nice analysis! It was very easy to follow along. It's interesting that the performance improvement of using the BigQuery to GCS method is only beneficial with ~150,000+ rows (depending on the number of columns). Thanks for providing clarification about how the API requests are made. It's very helpful!

In order to have a fair comparison, one step is missing in the analysis, which is loading the data from the downloaded blob into a dataframe, which the to_dataframe method takes care of as well.
Other than that, the numbers seem reasonable, and on par with the discussion above.

I apologize for the omission on my part. I found that the run time for pd.read_csv() was negligible but I didn't document that finding here. I agree that the numbers seem reasonable for the to_dataframe() method.

It's worth noting the average page fetch increases drastically as the number of pages increases, which is interesting. Maybe we need to dig deeper into this.

Yes, this is surprising. Do you think it would be helpful to profile memory usage to look for a memory leak? I would guess that regardless of the page_size the memory usage should be consistent across the requests of the same page_size.

For longer term, we should look into avoiding the table list data API altogether.

Would this be something similar to the BigQuery to GCS method? If not, can you share some information about the available options?

@Di-Ku
Copy link
Contributor Author

Di-Ku commented Apr 4, 2017

Thanks Anthoniou and Yasser for the detailed analysis and diagnosis.
Let's get this information in the docs here before closing.

@yebrahim
Copy link
Contributor

yebrahim commented Apr 4, 2017

Do you think it would be helpful to profile memory usage to look for a memory leak? I would guess that regardless of the page_size the memory usage should be consistent across the requests of the same page_size.

One thing pointed out by @craigcitro is worth investigating, which is we use df.append on every page, meaning that an ever-growing object gets copied everytime. Instead, we should look for a way to group all the dataframe pieces together without unnecessary copying. I'm looking into the docs to see if/how we can do this.

@yebrahim
Copy link
Contributor

yebrahim commented Apr 5, 2017

@parthea sorry I missed your last question.

Would this be something similar to the BigQuery to GCS method? If not, can you share some information about the available options?

I was thinking something like a google.datalab.storage.Object.to_dataframe method. We can already save a Table/Query to a GCS file, we need a way to import from there into a dataframe.

@yebrahim
Copy link
Contributor

yebrahim commented Apr 5, 2017

Here's an updated graph using pandas.concat instead of df.append on every page, it shaves off another 10% for large tables, which is small but not insignificant, considering it's cheap.

image

Opened #339 for these fixes.

yebrahim added a commit that referenced this issue Apr 5, 2017
Use large page size for downloading entire tables
Concatenate paged dataframes instead of appending
Follow up from the discussion on #329.
Replaces #220
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

3 participants