Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No option to append to tables instead of truncating them #8

Open
ziedbouf opened this issue Sep 4, 2018 · 7 comments
Open

No option to append to tables instead of truncating them #8

ziedbouf opened this issue Sep 4, 2018 · 7 comments

Comments

@ziedbouf
Copy link

ziedbouf commented Sep 4, 2018

Thanks for the project it helps with the slow process for Dataframe to_sql and it's more straight to proceed with odoo or others library.

For now i have an issue that i am doing dataframe copy inside a for loop but it seems to overwrite table each time it push data.

connection_string = 'postgresql://postgres:test_password@localhost:5432/data'
sql_engine = create_engine(connection_string, echo=False, pool_size=10, max_overflow=-1)
Base = automap_base()
Base.prepare(sql_engine, reflect=True)
my_model = Base.metadata.tables['my_model']


for index, x in enumerate(flat_list_of_bucketes[10:]):
    if len(pd.read_sql('SELECT * FROM imported_files WHERE file_path=%(file_path)s', con=sql_engine, params={'file_path': x['path']})) == 0:
        exec_by = 'hostname: ' + platform.node() + ', python_version: '+ platform.python_version() + ', created_at: ' + dt.datetime.utcnow().strftime('%d-%M-%YT%H:%m')
        now = dt.datetime.utcnow()
        t = TicToc()
        print('start reading the file :' + x['path'])
        with t,  fs.open(x['path']) as f:
            df = pd.read_csv(f, compression='gzip', header=0, low_memory=False)
            t.toc('reading data takes ', restart=True)
            df['id'] = [uuid.uuid1() for _ in range(len(df.index))]
            df['created_by'] = exec_by
            df['created_at'] = now
            t.toc('cleansing data takes: ', restart=True)
           
            with  sql_engine.connect() as c:
                       DataFrameCopy(df.copy(), conn=c, table_obj=my_model)
            t.toc('data save to postgres in: ')
            total_pushed_rows = total_pushed_rows + len(df)
            print('Total pushed rows :' + str(total_pushed_rows))
            df_meta_info = pd.DataFrame(data={'file_path': [x['path']],
                                              'imported_at': [now],
                                              'imported_by': [exec_by],
                                              'meta_data':[str(x)]})

            df_meta_info.to_sql('imported_files', con=sql_engine, if_exists='append', index=False)
            t.toc('pushing metadata to postgres takes ', restart=True)
            print('file was saved to the database :' + df_meta_info['file_path'][0])
    print('total of % of processed files are :' + str(index / total_num_of_files) + '%')

Trying to figure out if i missing any options to append similair to the to_sql or i need to manage the commit myself, but no clue for now any help on how to solve this?

@ziedbouf
Copy link
Author

ziedbouf commented Sep 4, 2018

Fixed the issue by overwriting the copy functions, i wasn't expecting the truncate to be deleting the full table.

class DontTruncateDataFrameCopy(DataFrameCopy):
    def truncate():
        pass
    def truncate(self):
        pass

    def create_pk(self):
        pass

    def create_fks():
        pass
        
    def drop_fks(self):
        pass

    def drop_pk(self):
        pass
        
    def copy_from_file(self, file_object):
        """
        COPY to PostgreSQL table using StringIO CSV object
        Parameters
        ----------
        file_object: StringIO
            CSV formatted data to COPY from DataFrame to PostgreSQL
        """
        cur = self.conn.connection.cursor()
        file_object.seek(0)
        columns = file_object.readline()
        sql = "COPY {table} ({columns}) FROM STDIN WITH  CSV".format(
            table=self.sql_table, columns=columns
        )
        cur.copy_expert(sql=sql, file=file_object)

    def copy(self, functions=[cast_pandas]):
        self.drop_fks()
        self.drop_pk()
        self.df = self.data_formatting(self.df, functions=functions)
        with self.conn.begin():
            self.truncate()

            self.logger.info("Creating generator for chunking dataframe")
            for chunk in df_generator(self.df, self.csv_chunksize):

                self.logger.info("Creating CSV in memory")
                fo = create_file_object(chunk)

                self.logger.info("Copying chunk to database")
                self.copy_from_file(fo)
                del fo

            self.logger.info("All chunks copied ({} rows)".format(self.rows))

        self.create_pk()
        self.create_fks()
        self.analyze()

for index, x in enumerate(flat_list_of_bucketes[10:]):
    if len(pd.read_sql('SELECT * FROM imported_files WHERE file_path=%(file_path)s', con=sql_engine, params={'file_path': x['path']})) == 0:
        exec_by = 'hostname: ' + platform.node() + ', python_version: '+ platform.python_version() + ', created_at: ' + dt.datetime.utcnow().strftime('%d-%M-%YT%H:%m')
        now = dt.datetime.utcnow()
        t = TicToc()
        print('start reading the file :' + x['path'])
        with t,  fs.open(x['path']) as f:
            df = pd.read_csv(f, compression='gzip', header=0, low_memory=False)
            t.toc('reading data takes ', restart=True)
            df['id'] = [uuid.uuid1() for _ in range(len(df.index))]
            df['created_by'] = exec_by
            df['created_at'] = now
            t.toc('cleansing data takes: ', restart=True)
           
            with  sql_engine.connect() as c:
                       DataFrameCopy(df.copy(), conn=c, table_obj=my_model)
            t.toc('data save to postgres in: ')
            total_pushed_rows = total_pushed_rows + len(df)
            print('Total pushed rows :' + str(total_pushed_rows))
            df_meta_info = pd.DataFrame(data={'file_path': [x['path']],
                                              'imported_at': [now],
                                              'imported_by': [exec_by],
                                              'meta_data':[str(x)]})

            df_meta_info.to_sql('imported_files', con=sql_engine, if_exists='append', index=False)
            t.toc('pushing metadata to postgres takes ', restart=True)
            print('file was saved to the database :' + df_meta_info['file_path'][0])
    print('total of % of processed files are :' + str(index / total_num_of_files) + '%')

Can you explain why the need of using of both truncate() and drop_pks() and drop_fks()?

@makmanalp
Copy link
Contributor

@ziedbouf Apologies for the late reply - we seemed to have missed this!

Our use case is mostly for importing data from scratch into postgres, so for us it was a natural default choice, but I see how it doesn't make sense for a lot of other use cases! Overwriting truncate() with an empty function is a decent option for you here.

The reason we drop the keys and re-create them is that having indexes in the table slows down insertion speed, especially during bulk loads, by a large amount - so much that dropping and regenerating at the end is often much faster for large loads.

In your scenario, if you're importing many source tables into one destination, of course dropping and recreating indexes after loading each source table doesn't make sense. One way to do this could be to load all the source tables beforehand and concatenate them into one large dataframe with pd.concat, and then load that one. The other way would be to look at the implementation of DataFrameCopy here:

def copy(self, functions=[cast_pandas]):

and instead write your own child class of BaseCopy that drops keys, loads multiple tables, then recreates them. The code for that part isn't overly complex. Good luck!

@makmanalp makmanalp changed the title Dataframe copy to postgres overwrite tables? No option to append to tables instead of truncating them Sep 11, 2018
@makmanalp
Copy link
Contributor

(keeping this ticket open as a reminder to implement support for this)

@ziedbouf
Copy link
Author

Thanks @makmanalp in my case i need multiple large file that i cannot consolidate these csv files in one pandas dataframe. In these case truncate will be creating new table each time copy() .

I agree tweaking the copy(self, functions=[cast_pandas]) function will be much elegant in the implementation, I will give it a look as soon as possible.

@makmanalp
Copy link
Contributor

More notes for us: perhaps we could support read_csv(chunksize=blah) / TextFileReader objects by doing a chunked load on that end too - or a dask dataframe, using an implementation similar to this but calling next on the iterator rather than doing reads with start/stop (the reason we do this in HDF is that we find it's somehow faster):

df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)

@ziedbouf
Copy link
Author

ziedbouf commented Sep 11, 2018

I agree on this. However i think that the chunk size should be optimized between having small/big chunk size to get the best in performance. I found the following article interesting. The author stated the following:

The chunksize should not be too small. If it is too small, the IO cost will be high to overcome the benefit.

What's the best approach to scale the chunk size considering the overall size of data? it might something to explore

@licht1stein
Copy link

Made a PR based on code above for similar use cases:
#10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants