# Jupyter Notebook
#### Get SAF data from .csv files and upload them to the remote OpenDose database

# Import libraries

In [1]:
import sqlalchemy
import pandas as pd

# Create fonctions to handle the database

In [2]:
def connect(server, user, password, db, host='localhost'):
    '''Returns a connection and a metadata object'''
    # We connect with the help of the URL
    # postgresql://postgres:postgres@localhost:5432/opendose
    url = '{}://{}:{}@{}/{}'
    url = url.format(server, user, password, host, db)

    # The return value of create_engine() is our connection object
    con = sqlalchemy.create_engine(url)#, client_encoding='utf8')
    # We then bind the connection to MetaData()
    meta = sqlalchemy.MetaData(bind=con) #, reflect=True)
    meta.reflect()

    return con, meta

def df_single_sql_query(df, col_id, table, con):
    '''Returns index of a column from a sql query'''
    res = []
    # allow only single df line query
    if len(df.index) != 1:
        return None
    # compose the sql query from the df dataframe
    sql='SELECT '+col_id+' FROM '+table+' WHERE '
    for col in df.columns:
        if col is not col_id:
            sql += col+"='"+df[col].iloc[0]+"' AND "
    sql = sql[:-5]+';'
    # execute the sql query on the database table
    for line in con.execute(sql):
        res.append(line[col_id])
    # return the index corresponding to the sql query (if one)
    return res

# Connect to the OpenDose database

In [3]:
# read-only user opendose
# con, meta = connect(server='postgresql', user='opendose', password='opendose', db='opendose', host='crctcalcul')
# read-write user postgres, mysql
con, meta = connect(server='postgresql', user='postgres', password='CRCT_eq15', db='opendose', host='10.31.210.110')
con

Engine(postgresql://postgres:***@10.31.210.110/opendose)

# Create the tables

In [4]:
# # # delete all tables
# # meta.drop_all(con) # ! Achtung !

# t_provenances = sqlalchemy.Table('t_provenances', meta,
#     Column('provenance_id', sqlalchemy.Integer, primary_key=True),
#     Column('provider', String, nullable=False),
#     Column('code', String, nullable=False),
#     Column('version', String, nullable=False),
#     Column('contact', String, nullable=False),
#     Column('email', String, nullable=False),
#     Column('date', Date, nullable=False)
# )

# t_phantoms = sqlalchemy.Table('t_phantoms', meta,
#     Column('phantom_id', Integer, primary_key=True),
#     Column('provider', String, nullable=False),
#     Column('reference', String, nullable=False),
#     Column('version', String, nullable=False),
#     Column('model', String, nullable=False),
#     Column('height', Float, nullable=False),
#     Column('mass', Float, nullable=False),
#     Column('size_x', Integer, nullable=False),
#     Column('size_y', Integer, nullable=False),
#     Column('size_z', Integer, nullable=False),
#     Column('res_x', Float, nullable=False),
#     Column('res_y', Float, nullable=False),
#     Column('res_z', Float, nullable=False)
# )

# t_regions = sqlalchemy.Table('t_regions', meta,
#     Column('region_id', Integer, primary_key=True),
#     Column('phantom_id', Integer, ForeignKey('t_phantoms.phantom_id'), nullable=False),
#     Column('region', Integer, nullable=False),
#     Column('name', String, nullable=False),
#     Column('volume_cm3', Float, nullable=False),
#     Column('mass_g', Float, nullable=False)
# )

# t_particles = sqlalchemy.Table('t_particles', meta,
#     Column('particle_id', Integer, primary_key=True),
#     Column('name', String, nullable=False)
# )

# t_safs = sqlalchemy.Table('t_safs', meta,
#     Column('provenance_id', Integer, ForeignKey('t_provenances.provenance_id'), nullable=False),
#     Column('source_id', Integer, ForeignKey('t_regions.region_id'), nullable=False),
#     Column('target_id', Integer, ForeignKey('t_regions.region_id'), nullable=False),
#     Column('particle_id', Integer, ForeignKey('t_particles.particle_id'), nullable=False),
#     Column('energy_MeV', Float, nullable=False),
#     Column('saf', Float, nullable=False),
#     Column('saf_std', Float, nullable=False),
#     Column('nb_primaries', Float, nullable=False)
# )

# # create all tables
# meta.create_all(con)

for table in meta.tables:
    print (table)

t_phantoms
t_regions
t_provenances
t_particles
t_safs


In [5]:
# this load all the tables into memory, it's not adapted for large tables like t_safs
# con.execute('SELECT * FROM t_provenances').fetchall()
t_pro = pd.read_sql('t_provenances', con)
t_pro.tail(10)

Unnamed: 0,provenance_id,provider,code,version,contact,email,date
19,48,CRUK,PENELOPE,2014,Nadia Falzone,nadia.falzone@oncology.ox.ac.uk,2019-08-28
20,49,NPL,EGS++,2018,Ana Denis-Bacelar,ana.denisbacelar@npl.co.uk,2019-10-30
21,50,CRCT,Geant4,10.5,Maxime Chauvin,maxime.chauvin@inserm.fr,2019-04-01
22,51,IRSN,MCNPX,2.6c,Aurélie Desbrée,aurelie.desbree@irsn.fr,2020-01-10
23,52,IRSN,MCNPX,2.6c,Aurélie Desbrée,aurelie.desbree@irsn.fr,2020-01-15
24,53,IRSN,MCNPX,2.6c,Aurélie Desbrée,aurelie.desbree@irsn.fr,2020-01-14
25,56,PolSl,GATE,8.1,Damian Borys,damian.borys@polsl.pl,2020-06-13
26,54,PolSl,GATE,8.1,Damian Borys,damian.borys@polsl.pl,2020-06-11
27,55,PolSl,GATE,8.1,Damian Borys,damian.borys@polsl.pl,2020-06-12
28,57,SCK.CEN,PHITS,3.10,Jérémie Dabin,jeremie.dabin@sckcen.be,2020-06-10


# Fill the phantom and particle tables 

In [20]:
# import pandas as pd

# # ICRP phantom table
# phantom = {'phantom_id':[1,2],
#            'provider':['ICRP','ICRP'],
#            'reference':['110','110'],
#            'version':['1.2','1.2'], 
#            'model':['AF','AM'],
#            'height':[1.63,1.76],'mass':[60.0,73.0],
#            'size_x':[299,254],'size_y':[137,127],'size_z':[348,222],
#            'res_x':[1.775,2.137],'res_y':[1.775,2.137],'res_z':[4.84,8.0]}
# phantoms = pd.DataFrame(phantom)
# phantoms.to_sql('t_phantoms', con, if_exists='append', index=False)

# # ICRP regions table
# regions_AF = pd.read_table('~/OpenDose/phantoms/ICRP_110_1.2/AF_regions.txt',sep='\t',engine='python')
# regions_AM = pd.read_table('~/OpenDose/phantoms/ICRP_110_1.2/AM_regions.txt',sep='\t',engine='python')
# regions_AF['phantom_id'] = 1
# regions_AM['phantom_id'] = 2
# regions_AF.to_sql('t_regions', con, if_exists='append', index=False)
# regions_AM.to_sql('t_regions', con, if_exists='append', index=False)

# # particle table
# particle = {'particle_id':[1,2], 'name':['photons','electrons']}
# particles = pd.DataFrame(particle)
# particles.to_sql('t_particles', con, if_exists='append', index=False)

# Insert SAF results into the database

In [21]:
# if the filename structure is not correct use the command
# find . -name "*.csv" | while read f; do rename -v 's/AM/AM_/' $f; done
# find . -name "*.csv" | while read f; do rename -v 's/\/AF/\/AF_/' $f; done
# find . -name "*.csv" | while read f; do rename -v 's/photons/_photons/' $f; done
# find . -name "*.csv" | while read f; do rename -v 's/electrons/_electrons/' $f; done
# sed -i 's|,|.|g' *.csv
# sed -i 's|;|,|g' *.csv

In [30]:
import os
import shutil
import pandas as pd
import datetime
from sqlalchemy import *
pd.set_option('display.width', 1000)

# read team information
# teams = pd.read_csv('/home/gate/OpenDose/members/contactpersons.csv',names=['id','provider','contact','email'])
teams = pd.read_csv('/home/gate/OpenDose/0-Collaboration/members/contactpersons.csv',names=['id','provider','contact','email'])

# number of primaries
nb_primaries=1e8

# root directory of the SAF results
# rootdir = '/home/gate/OpenDose/SAFs/'
rootdir = '/home/gate/Downloads/OD_newSAFs/'

# scan the directory tree of the SAF results
for root, dirs, files in os.walk(rootdir):
    if not dirs:
        # ********************* information from the directory structure *********************
        # ************************************************************************************
        root = root.replace(rootdir,'')
        provenance = root.split('/')
        # here we have access to the code + version and to the team
        provenance_df = teams.loc[teams.provider == provenance[2]].drop('id', axis=1)
        provenance_df['code'] = provenance[1].split('_')[0]
        provenance_df['version'] = provenance[1].split('_')[1]

        # scan the unprocessed .csv file
        if provenance[-1] == '_toadd':
            for file in files:        
                if "SAF.csv" in file:
                    print ('processing file',rootdir+root+'/'+file,'...')
                    # ********************* information from the filename *********************
                    # *************************************************************************
                    params = file.split('_')
                    # get the date from the modification date of the file
                    file_time = os.path.getmtime(rootdir+root+'/'+file)
                    provenance_df['date'] = datetime.datetime.fromtimestamp(file_time).strftime("%Y-%m-%d")
                    # check if this provenance already exist, if not fill the database
                    provenance_id = df_single_sql_query(provenance_df,'provenance_id','t_provenances',con)
                    if not provenance_id:
                        provenance_df.to_sql('t_provenances', con, if_exists='append', index=False)
                        provenance_id = df_single_sql_query(provenance_df,'provenance_id','t_provenances',con)
                    # here we have access to phantom_id
                    phantom = {'provider':[provenance[0].split('_')[0]], 'reference':[provenance[0].split('_')[1]], 
                               'version':[provenance[0].split('_')[2]], 'model':[params[0]]}
                    phantom_df = pd.DataFrame(phantom)
                    phantom_id = df_single_sql_query(phantom_df,'phantom_id','t_phantoms',con)
                    # here we have access to source_id
                    sql = "SELECT region_id FROM t_regions WHERE phantom_id='"+str(phantom_id[0])+"' AND region='"+params[1]+"'"
                    source_id = con.execute(sql).fetchall()[0][0]
                    # here we have access to particle_id
                    sql = "SELECT particle_id FROM t_particles WHERE name='"+params[2]+"'"
                    particle_id = con.execute(sql).fetchall()[0][0]
                    # ********************* information from the SAF.csv file *********************
                    # *****************************************************************************
                    saf_df = pd.read_csv(rootdir+root+'/'+file, index_col=0)
                    # here we have access to target_id's
                    targets = []
                    for col in saf_df.columns:
                        target = str(''.join(filter(str.isdigit, col)))
#                         print (col,target)
                        sql = "SELECT region_id FROM t_regions WHERE phantom_id='"+str(phantom_id[0])+"' AND region='"+target+"'"
                        target_id = con.execute(sql).fetchall()
                        if target_id:
                            targets.append(target_id[0][0])
                        else:
                            targets.append('None')
                    saf_df.columns = targets
                    saf_df = saf_df.drop('None', axis=1, errors='ignore')
                    # convert the dataframe to have the columns as values in a new column named region_id
                    saf_df = saf_df.stack().reset_index(level=1, name='saf').rename(columns={'level_1':'target_id'})
                    saf_df.index.names = ['energy_MeV']
                    saf_df.reset_index(level=0, inplace=True)

                    # ********************* information from the SAFerr.csv file *********************
                    # ********************************************************************************
                    # do the same to get the saf error per target_id per energy_MeV
                    try:
                        saferr_df = pd.read_csv(rootdir+root+'/'+file.replace('SAF','SAFerr'), index_col=0)
                        # here we have access to target_id's
                        targets = []
                        for col in saferr_df.columns:
                            target = str(''.join(filter(str.isdigit, col)))
                            sql = "SELECT region_id FROM t_regions WHERE phantom_id='"+str(phantom_id[0])+"' AND region='"+target+"'"
                            target_id = con.execute(sql).fetchall()
                            if target_id:
                                targets.append(target_id[0][0])
                            else:
                                targets.append('None')
                        saferr_df.columns = targets
                        saferr_df = saferr_df.drop('None', axis=1, errors='ignore')
                        # convert the dataframe to have the columns as values in a new column named region_id
                        saferr_df = saferr_df.stack().reset_index(level=1, name='saf_std').rename(columns={'level_1':'target_id'})
                        saferr_df.index.names = ['energy_MeV']
                        saferr_df.reset_index(level=0, inplace=True)
                        # merge the saferr df with the saf df
                        try:
                            df_db = pd.merge(saf_df, saferr_df, on=['energy_MeV','target_id'], how='outer', validate="one_to_one")
                            # add information to the dataframe for the database
                            df_db['provenance_id'] = provenance_id[0]
                            df_db['source_id'] = source_id
                            df_db['particle_id'] = particle_id
                            df_db['nb_primaries'] = nb_primaries
                            try:
                                # fill the database with SAF and SAFerr
                                df_db.to_sql('t_safs', con, if_exists='append', index=False)
                                # if everything went well move the files outside the _toadd directory
                                shutil.move(rootdir+root+'/'+file,rootdir+root+'/../'+file)
                                shutil.move(rootdir+root+'/'+file.replace('SAF','SAFerr'),rootdir+root+'/../'+file.replace('SAF','SAFerr'))
                            except:
                                print ('Error: the database has refused the data')
                                print ('  The SAF.csv or SAFerr.csv may be corrupted (check column contents)')
                                print ('  ...moving them to',rootdir+root+'/../_corrupted')
                                shutil.move(rootdir+root+'/'+file,rootdir+root+'/../_corrupted/'+file)
                                shutil.move(rootdir+root+'/'+file.replace('SAF','SAFerr'),rootdir+root+'/../_corrupted/'+file.replace('SAF','SAFerr'))
                        except:
                            print ('Error: the SAF.csv and SAFerr.csv cannot be merged:')
                            print ('  the rows or columns are not homogeneous')
                            print ('  ...moving them to',rootdir+root+'/../_corrupted')
                            shutil.move(rootdir+root+'/'+file,rootdir+root+'/../_corrupted/'+file)
                            shutil.move(rootdir+root+'/'+file.replace('SAF','SAFerr'),rootdir+root+'/../_corrupted/'+file.replace('SAF','SAFerr'))
                    except:
                        print ('  Error: no SAFerr.csv file')
                        print ('  ...moving the SAF.csv to',rootdir+root+'/../_corrupted')
                        shutil.move(rootdir+root+'/'+file,rootdir+root+'/../_corrupted/'+file)

processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AF_153_electrons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AM_148_photons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AF_80_electrons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AM_82_electrons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AM_98_photons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AF_135_electrons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AF_141_photons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AF_29_photons_SAF.csv ...
processing file /home/gate/Downloads/OD_newSAFs/ICRP_110_1.2/GATE_8.1/PolSl/_toadd/AF_14_photons_SAF.csv ...
process

# Clean the database

In [17]:
# SELECT COUNT(*) FROM t_safs; = 11177347 (2019-05-06)
# SELECT COUNT(*) FROM t_safs; = 12608488 (2020-01-06)
# SELECT COUNT(*) FROM t_safs; = 12624553 (2020-01-10)
# SELECT COUNT(*) FROM t_safs; = 12633905 (2020-01-15)
# SELECT COUNT(*) FROM t_safs; = 19604106 (2020-02-25)
# SELECT COUNT(*) FROM t_safs; = 19608810 (2020-06-23)

## these are for psql. TODO: write a generic SQL script

In [None]:
from sqlalchemy import *

print ('Cleaning the database...')

# # ********************* remove loose entries in t_provenances *********************
# # *********************************************************************************
# sql = '''
# DELETE FROM t_provenances a 
# WHERE NOT EXISTS 
# (SELECT * FROM t_safs b WHERE a.provenance_id = b.provenance_id)
# '''
# result = con.execute(sql)
# print (result.rowcount,'orphen provenances in t_provenances have been deleted.')

# # ********************* remove duplicates in t_safs *********************
# # ***********************************************************************
# # delete rows in the list of bad ctid (shorter)
# sql = '''
# DELETE FROM t_safs a USING 
# (SELECT max(ctid) AS badctid FROM t_safs GROUP BY t_safs.* HAVING COUNT(*) > 1) b 
# WHERE a.ctid IN (badctid)
# '''
# result = con.execute(sql)
# print (result.rowcount,'duplicate rows in t_safs have been deleted.')
# 2019-03-25 there are 2067821 entries in the database
# 2019-04-04 there are 7812678 entries in the database

# # clean by code to be sure there is one entry per source, target, energy


# SQL queries

In [None]:
# # to see only a limited number of rows
# SELECT * FROM t_safs LIMIT 10;

# # check duplicates
# SELECT * FROM t_safs GROUP BY t_safs.* HAVING (COUNT(*) > 1);
# # check duplicates with different saf or saf_std
# SELECT provenance_id, source_id, target_id, particle_id, energy_MeV, nb_primaries FROM t_safs GROUP BY provenance_id, source_id, target_id, particle_id, energy_MeV, nb_primaries HAVING (COUNT(*) > 1);

# # delete entries
# DELETE FROM t_safs WHERE provenance_id=xxx;
# DELETE FROM t_provenances WHERE provenance_id=xxx;

# # reset the primary key auto increment value after deleting entries
# ALTER SEQUENCE t_provenances_provenance_id_seq RESTART WITH xxx;