# Data retrieval and cleaning

We import basic libraries

In [1]:
import pandas as pd
import numpy as np
from google.cloud import storage
import os
from pathlib import Path
from tqdm.std import tqdm
import sqlite3
from google.cloud import bigquery

## Retrieval and Storage of Basic files from Google Cloud Storage

In [2]:
BUCKET_NAME = 'raw_profiles'
PLATE_NUMBER = '24277'
LOCAL_DATA_PATH = os.path.join(os.path.expanduser('~'), ".morpho_minds_data")
MODEL_TARGET = 'local'

In [3]:
def create_folder_structure(plate_number):
        """
        Check for folder structure and create it when needed.
        """
        ## Check if data folders exists. If not, create it.
        if not os.path.exists(LOCAL_DATA_PATH):
            os.makedirs(LOCAL_DATA_PATH)
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'raw'))
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'processed'))

        if not os.path.exists(Path(LOCAL_DATA_PATH).joinpath(plate_number)):
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number))
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'raw'))
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'processed'))

        if not os.path.exists(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'raw')):
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'raw'))

        if not os.path.exists(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'processed')):
            os.makedirs(Path(LOCAL_DATA_PATH).joinpath(plate_number, 'processed'))

In [4]:
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """
    Download a file from GCS. Is called blob so is generic but will retrieve the SQLite DB.

    :param bucket_name: The name of the bucket
    :param source_blob_name: The name of the blob
    :param destination_file_name: The name of the file to save the blob to
    """
    # Initialize a client
    storage_client = storage.Client()
    # Get the bucket
    bucket = storage_client.bucket(bucket_name)
    # Get the blob
    blob = bucket.blob(source_blob_name)
    # Download the blob to a destination file
    with open(destination_file_name, 'wb') as f:
        with tqdm.wrapattr(f, "write", total=blob.size) as file_obj:
            storage_client.download_blob_to_file(blob, file_obj)


## Merge of Dataframes into Useful Data

In [5]:
# AGP == Ph-golgi
# DNA == Hoechst
# ER == ERSyto
# Mito == Mito
# RNA == ERSytoBleed

In [131]:
class Plate:
    def __init__(self, plate_number=None, chem_df=None, images_df=None, well_df=None, plate_df=None):
        self.plate_number = plate_number
        self.chem_df = chem_df
        self.images_df = images_df
        self.well_df = well_df
        self.plate_df = plate_df
        self.chem_cols = ['BROAD_ID', 'CPD_NAME', 'CPD_NAME_TYPE', 'SOURCE_NAME', 'CPD_SMILES']
        self.well_cols = ['Metadata_Well', 'Metadata_ASSAY_WELL_ROLE', 'Metadata_broad_sample', 'Metadata_mmoles_per_liter']


    def load(self):
        """
        Load the data from the local or remote source.
        """
        if MODEL_TARGET == 'local':
            print('Loading local data...')
            pictures_dir = Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'raw', 'raw_pictures')
            processed_file = Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'processed', f'{PLATE_NUMBER}_processed.csv')
            if not pictures_dir.is_dir():
                print('Local pictures not found. Please make sure the data is available in the local folder.')
                exit(1)
            elif not processed_file.is_file():
                print('Local processed file not found. Trying to retrieve data from Big Query...')
                try:
                    full_table_name = f"{GCP_PROJECT}.{BQ_DATASET}.{PLATE_NUMBER}_processed"
                    data = big_query_read(GCP_PROJECT, full_table_name)
                    data = data.to_dataframe()

                    # Save it locally to accelerate the next queries!
                    self.processed_df = data.to_csv(processed_file, header=True, index=False)
                except:
                    print('Big Query data not found. Retrieving data from remote server...')
                    self.retrieve_data()
                    self.merge_data()
                    #self.save()

        if MODEL_TARGET == 'prod':

            print('Malo')

    def retrieve_data(self):
        """
        Load the all the plate data into different dataframes.
        """
        # Check for folder structure and create it when needed.
        create_folder_structure(self.plate_number)

        ## Check that file chemical_compounds.csv exists locally. If not, download it.
        data_query_cache_path = Path(LOCAL_DATA_PATH).joinpath(self.plate_number, 'chemical_annotations.csv')
        data_query_cached_exists = data_query_cache_path.is_file()

        if data_query_cached_exists:
            print('Loading Chemical Annotations from local CSV...')
            self.chem_df = pd.read_csv(data_query_cache_path)
            
        else:
            print('Loading Chemical Annotations from remote server...')
            download_blob(BUCKET_NAME,
                        f'{self.plate_number}/chemical_annotations.csv',
                        Path(LOCAL_DATA_PATH).joinpath(self.plate_number, 'chemical_annotations.csv')
                        )
            
            self.chem_df = pd.read_csv(data_query_cache_path)
        self.chem_df = self.chem_df[self.chem_cols]
        self.chem_df.rename(columns={'BROAD_ID': 'DrugID',
                                     'CPD_NAME': 'CPDName',
                                     'CPD_NAME_TYPE': 'CPDTypeName',
                                     'SOURCE_NAME': 'SourceName',
                                     'CPD_SMILES': 'CPDSmiles'}, inplace=True)

        ## Check that sqlite db exists locally. If not, download it.
        data_query_cache_path = Path(LOCAL_DATA_PATH).joinpath(self.plate_number, 'raw', f'{self.plate_number}.sqlite')
        data_query_cached_exists = data_query_cache_path.is_file()

        if data_query_cached_exists:
            print('Loading SQLite DB from local DB...')
        else:
            print('Loading SQLite DB from remote DB...')
            download_blob(BUCKET_NAME,
                        f'{self.plate_number}/{self.plate_number}.sqlite',
                        Path(LOCAL_DATA_PATH).joinpath(self.plate_number, 'raw', f'{self.plate_number}.sqlite')
                        )

        conn = sqlite3.connect(data_query_cache_path)
        query = """
                SELECT Image_URL_OrigAGP, Image_URL_OrigDNA, Image_URL_OrigER, Image_URL_OrigMito, Image_URL_OrigRNA, Image_Count_Cells
                FROM Image
                """
        cursor = conn.execute(query)
        data = cursor.fetchall()
        self.images_df = pd.DataFrame(data, columns=['Ph-golgi', 'Hoechst', 'ERSyto', 'Mito', 'ERSytoBleed', 'CellCount'])

        conn.close()

        ## Check that mean_well_profile.csv exists. If not, download it.
        data_query_cache_path = Path(LOCAL_DATA_PATH).joinpath(self.plate_number, 'raw', 'mean_well_profiles.csv')
        data_query_cached_exists = data_query_cache_path.is_file()

        if data_query_cached_exists:
            print('Loading Well Profiles from local CSV...')
            self.well_df = pd.read_csv(data_query_cache_path)
        else:
            print('Loading Well Profiles from remote server...')
            download_blob(BUCKET_NAME,
                        f'{self.plate_number}/mean_well_profiles.csv',
                        Path(LOCAL_DATA_PATH).joinpath(self.plate_number, 'raw', 'mean_well_profiles.csv')
                        )
            self.well_df = pd.read_csv(data_query_cache_path)
        self.well_df = self.well_df[self.well_cols]
        self.well_df.rename(columns={'Metadata_Well': 'Well', 
                                                    'Metadata_ASSAY_WELL_ROLE': 'Role', 
                                                    'Metadata_broad_sample': 'DrugID', 
                                                    'Metadata_mmoles_per_liter': 'MMoles'}, inplace=True)
        
        print('✅ Data loaded successfully.')

        return self

    def merge_data(self):
        """
        Clean the data.
        """
        print('Extracting well from picture file name...')
        wells_df = self.images_df.drop(columns=['CellCount']).applymap(lambda x: x.split('/')[-1].split('_')[1])
        wells_df['Well'] = wells_df.apply(lambda row: row.unique()[0] if row.nunique()==1 else 0, axis=1)

        print('Extracting photo id from picture file name...')
        photo_number_df = self.images_df.drop(columns=['CellCount',]).applymap(lambda x: x.split('/')[-1].split('_')[2])
        photo_number_df['PhotoNumber'] = photo_number_df.apply(lambda row: int(row.unique()[0][1]) if row.nunique()==1 else float('NaN'), axis=1)

        print('Converting photo path for training...')
        self.images_df['Ph-golgi'] = self.images_df['Ph-golgi'].apply(lambda x: str(Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'Raw_pictures', f'{PLATE_NUMBER}-Ph_golgi', x.split('/')[-1])))
        self.images_df['Hoechst'] = self.images_df['Hoechst'].apply(lambda x: str(Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'Raw_pictures', f'{PLATE_NUMBER}-Hoechst', x.split('/')[-1])))
        self.images_df['ERSyto'] = self.images_df['ERSyto'].apply(lambda x: str(Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'Raw_pictures', f'{PLATE_NUMBER}-ERSyto', x.split('/')[-1])))
        self.images_df['Mito'] = self.images_df['Mito'].apply(lambda x: str(Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'Raw_pictures', f'{PLATE_NUMBER}-Mito', x.split('/')[-1])))
        self.images_df['ERSytoBleed'] = self.images_df['ERSytoBleed'].apply(lambda x: str(Path(LOCAL_DATA_PATH).joinpath(PLATE_NUMBER, 'Raw_pictures', f'{PLATE_NUMBER}-ERSytoBleed', x.split('/')[-1])))

        print('Concatenating...')
        self.concat_df = pd.concat([
            self.images_df,
            wells_df['Well'],
            photo_number_df['PhotoNumber'].astype('int8'),
        ],
        axis = 1)
        
        self.concat_df[['CellCount']] = self.concat_df[['CellCount']].astype('int8')
        self.well_df[['MMoles']] = self.well_df[['MMoles']].astype('float16')

        print('Identifying drugs used per well...')

        self.processed_df = self.concat_df.merge(self.well_df).merge(self.chem_df, how='left', on='DrugID').fillna('None')

        print('✅ Data Merged')
        
    def save(self):
        pass
            
plate = Plate('24277')
plate.load()
images_df = plate.images_df
chem_df = plate.chem_df
well_df = plate.well_df
concat_df = plate.concat_df
processed_df = plate.processed_df

Loading local data...
Local processed file not found. Trying to retrieve data from Big Query...
Big Query data not found. Retrieving data from remote server...
Loading Chemical Annotations from local CSV...
Loading SQLite DB from local DB...
Loading Well Profiles from local CSV...
✅ Data loaded successfully.
Extracting well from picture file name...
Extracting photo id from picture file name...
Converting photo path for training...
Concatenating...
Identifying drugs used per well...
✅ Data Merged


In [132]:
processed_df.shape

(2297, 15)

In [133]:
processed_df.isna().sum()

Ph-golgi       0
Hoechst        0
ERSyto         0
Mito           0
ERSytoBleed    0
CellCount      0
Well           0
PhotoNumber    0
Role           0
DrugID         0
MMoles         0
CPDName        0
CPDTypeName    0
SourceName     0
CPDSmiles      0
dtype: int64

In [134]:
series_1 = processed_df.iloc[0,:]
display(series_1)
for i in range(len(series_1)):
    display(f'{i} Column {series_1.keys()[i]} is of type {type(series_1[i])}')

Ph-golgi       /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
Hoechst        /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
ERSyto         /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
Mito           /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
ERSytoBleed    /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
CellCount                                                     91
Well                                                         a01
PhotoNumber                                                    1
Role                                                     treated
DrugID                                    BRD-K18250272-003-03-7
MMoles                                                  3.023438
CPDName                                             propoxycaine
CPDTypeName                                                  INN
SourceName                               Prestwick Chemical Inc.
CPDSmiles                         CCCOc1cc(N)ccc1C(=O)OCCN(CC)CC
Name: 0, dtype: object

"0 Column Ph-golgi is of type <class 'str'>"

"1 Column Hoechst is of type <class 'str'>"

"2 Column ERSyto is of type <class 'str'>"

"3 Column Mito is of type <class 'str'>"

"4 Column ERSytoBleed is of type <class 'str'>"

"5 Column CellCount is of type <class 'numpy.int8'>"

"6 Column Well is of type <class 'str'>"

"7 Column PhotoNumber is of type <class 'numpy.int8'>"

"8 Column Role is of type <class 'str'>"

"9 Column DrugID is of type <class 'str'>"

"10 Column MMoles is of type <class 'numpy.float16'>"

"11 Column CPDName is of type <class 'str'>"

"12 Column CPDTypeName is of type <class 'str'>"

"13 Column SourceName is of type <class 'str'>"

"14 Column CPDSmiles is of type <class 'str'>"

In [113]:
second_merge = concat_df.merge(well_df).merge(chem_df, how='left', on='DrugID')
second_merge.shape

(2297, 15)

In [114]:
second_merge.isna().sum()

Ph-golgi         0
Hoechst          0
ERSyto           0
Mito             0
ERSytoBleed      0
CellCount        0
Well             0
PhotoNumber      0
Role             0
DrugID           0
MMoles           0
CPDName        384
CPDTypeName    384
SourceName     384
CPDSmiles      384
dtype: int64

In [115]:
second_merge = second_merge.fillna('None')
second_merge.isna().sum()

Ph-golgi       0
Hoechst        0
ERSyto         0
Mito           0
ERSytoBleed    0
CellCount      0
Well           0
PhotoNumber    0
Role           0
DrugID         0
MMoles         0
CPDName        0
CPDTypeName    0
SourceName     0
CPDSmiles      0
dtype: int64

In [104]:
series_2 = second_merge.iloc[0,:]
display(series_2)
for i in range(len(series_1)):
    display(f'{i} Column {series_2.keys()[i]} is of type {type(series_2[i])}')

Ph-golgi       /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
Hoechst        /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
ERSyto         /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
Mito           /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
ERSytoBleed    /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
CellCount                                                   91.0
Well                                                         a01
PhotoNumber                                                    1
Role                                                     treated
DrugID                                    BRD-K18250272-003-03-7
MMoles                                                  3.023438
CPDName                                             propoxycaine
CPDTypeName                                                  INN
SourceName                               Prestwick Chemical Inc.
CPDSmiles                         CCCOc1cc(N)ccc1C(=O)OCCN(CC)CC
Name: 0, dtype: object

"0 Column Ph-golgi is of type <class 'str'>"

"1 Column Hoechst is of type <class 'str'>"

"2 Column ERSyto is of type <class 'str'>"

"3 Column Mito is of type <class 'str'>"

"4 Column ERSytoBleed is of type <class 'str'>"

"5 Column CellCount is of type <class 'numpy.float16'>"

"6 Column Well is of type <class 'str'>"

"7 Column PhotoNumber is of type <class 'numpy.int8'>"

"8 Column Role is of type <class 'str'>"

"9 Column DrugID is of type <class 'str'>"

"10 Column MMoles is of type <class 'numpy.float16'>"

In [85]:
series_1 = concat_df.merge(well_df).iloc[0,:]
series_1

Ph-golgi       /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
Hoechst        /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
ERSyto         /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
Mito           /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
ERSytoBleed    /Users/pepe/.morpho_minds_data/24277/Raw_pictu...
CellCount                                                   91.0
Well                                                         a01
PhotoNumber                                                    1
Role                                                     treated
DrugID                                    BRD-K18250272-003-03-7
MMoles                                                  3.023438
Name: 0, dtype: object

In [93]:
display(first_merge.info())
display(chem_df.info())

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2297 entries, 0 to 2296
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   Ph-golgi     2297 non-null   object 
 1   Hoechst      2297 non-null   object 
 2   ERSyto       2297 non-null   object 
 3   Mito         2297 non-null   object 
 4   ERSytoBleed  2297 non-null   object 
 5   CellCount    2297 non-null   float16
 6   Well         2297 non-null   object 
 7   PhotoNumber  2297 non-null   int8   
 8   Role         2297 non-null   object 
 9   DrugID       2297 non-null   object 
 10  MMoles       2297 non-null   float16
dtypes: float16(2), int8(1), object(8)
memory usage: 172.7+ KB


None

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30616 entries, 0 to 30615
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   DrugID       30616 non-null  object
 1   CPDName      30612 non-null  object
 2   CPDTypeName  30612 non-null  object
 3   SourceName   30612 non-null  object
 4   CPDSmiles    30612 non-null  object
dtypes: object(5)
memory usage: 1.2+ MB


None

In [86]:
for i in range(len(series_1)):
    display(f'{i} Column {series_1.keys()[i]} is of type {type(series_1[i])}')

"0 Column Ph-golgi is of type <class 'str'>"

"1 Column Hoechst is of type <class 'str'>"

"2 Column ERSyto is of type <class 'str'>"

"3 Column Mito is of type <class 'str'>"

"4 Column ERSytoBleed is of type <class 'str'>"

"5 Column CellCount is of type <class 'numpy.float16'>"

"6 Column Well is of type <class 'str'>"

"7 Column PhotoNumber is of type <class 'numpy.int8'>"

"8 Column Role is of type <class 'str'>"

"9 Column DrugID is of type <class 'str'>"

"10 Column MMoles is of type <class 'numpy.float16'>"

In [None]:
well_df.reset_index(drop=True, inplace=True)
chem_df.reset_index(drop=True, inplace=True)
display(well_df.describe())
display(chem_df.describe())
well_df.merge(chem_df, how='left', on='')

## BigQuery Test

In [None]:
gcp_project = 'gifted-electron-411311'
GCP_PROJECT = 'gifted-electron-411311'
BQ_REGION='EU'
BQ_DATASET='morpho_minds'
PLATE_NUMBER='24277'
full_table_name = f"{GCP_PROJECT}.{BQ_DATASET}.{PLATE_NUMBER}_processed"

data = pd.read_csv('/Users/pepe/.morpho_minds_data/24277/processed/24277_processed.csv')

client = bigquery.Client(project=gcp_project)
write_mode = "WRITE_TRUNCATE"


In [None]:
client = bigquery.Client()

In [None]:
job_config = bigquery.LoadJobConfig(write_disposition=write_mode)

In [None]:
job = client.load_table_from_dataframe(data, full_table_name, job_config=job_config)
job

In [None]:
data['Ph-golgi'][0]

In [None]:
data['Hoechst'][0]

In [135]:
import os

In [136]:
os.path

<module 'posixpath' from '/Users/pepe/.pyenv/versions/3.10.6/lib/python3.10/posixpath.py'>