Skip to content

xml files processor optimization

Pan Deng edited this page May 21, 2018 · 1 revision

Xml files are small (~100kB per file), but the ingest, transform and dump-to-database is extremely slow.

Two improvements can be done with the code:

Bottleneck 1:

Because the error, using psycopg2 to connect to PostgreSQL across nodes are not easy. Originally, updating entries in psql database is done by extracting dataframe information as plain strings and save to dictionary:

    def update_patient_info(info, key):
      <sql_queries>
      
    info = {'stage': xml_schema.first()['shared_stage:stage_event']['shared_stage:pathologic_stage']._VALUE, \
            'primary_site': xml_schema.first()['clin_shared:tumor_tissue_site']._VALUE, \
            'gender': xml_schema.first()['shared:gender']._VALUE}
    update_patient_info(info, key=f.caseid)

Each extraction is considered as one task, and takes ~1s.

Total time for processing 100 files: 863.7245299816132s

Solution 1:

As suggested by this question, map each partition to a psql interfacing method, and connect to psql with psycopg2 in each partition:

  def update_patient_info(info, key):
      from psycopg2 import extras
      conn = psycopg2.connect(tokens)
      cur = conn.cursor()
      for row in rows:
        cur.execute(query)
        conn.commit()
        ...
        
  xml_schema_rdd.foreachPartition(test_info)

(Make sure psycopg2 are avaliable on worker nodes)

Total time for processing 100 files: 367.3962197303772

Solution 2:

cur.executemany(query, rows)

Total time for processing 100 files: 356.623165845871

Solution 3:

from psycopg2 import extras
psycopg2.extras.execute_batch(cur, query, rows)

(The submodule has to be imported to each partition

Total time for processing 100 files: 374.7439913749695

Conclusion:

  1. Never extract information from DataFrame/RDD

  2. batch execution isn't helping much

Bottleneck 2:

The reason the batch optimization didn't work above is because the files are passed one by one. So there's only 1 row of RDD sent to database every time.

Since the following is not achievable:

  def xml_reader(f):
    df.select()
    ...
  def csv_reader(f):
    spark.write.format(‘jdbc’)
    ...

  dict = {“XML”: xml_reader, “CSV:”: csv_reader}

  files_df.foreachPartition(dict[files_df.filetype])

Because of:

_pickle.PicklingError: Could not serialize object: 
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation.
SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

We need other way around.

Solution 1: Naive Map

  def xml_reader(f):
    df.select()
    ...
  def csv_reader(f):
    spark.write.format(‘jdbc’)
    ...

  dict = {“XML”: xml_reader, “CSV:”: csv_reader}

  flist = list(df.rdd.collect())
  list(map(lambda x: dict[x.filetype](x), flist))

Total time for processing 100 files: (Similar to for-loop, not parallelized at all)

Solution 2: Multi-process map

    from collections import deque
    from multiprocessing import Pool
    pool = Pool(processes = 8)
    deque(pool.map(lambda x:inner(spark, x), filelist))

Returns:

AttributeError: Can't pickle local object 'process_xml.<locals>.<lambda>'

Solution 3: Swich rdd type. Use python xml reader

Extract the filelist as RDD, and use mapPartitions() to parse the RDDs.

Connect to Amazon S3 with boto3 and parse xml files with python built-in xml.etree (instead of spark.read and spark-xml)

Total time for processing 1080 files: 107.45448088645935

Now, since each batch have multiple RDDs, we can try

from psycopg2 import extras
psycopg2.extras.execute_batch(cur, query, rows)

again.

Total time for processing 1080 files: 88.00022768974304

Conclusion:

Processed 100 times faster

Parallel what needs to be parallelized when there's conflict.

Possible concerns:

Boto3 and psycopg2 as bottleneck.

HDFS and HIVE might be supported better - No more Internet connection required