In [1]:
!pip install luigi
!pip install wget


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting luigi
  Downloading luigi-3.3.0.tar.gz (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting python-daemon (from luigi)
  Downloading python_daemon-3.0.1-py3-none-any.whl (31 kB)
Collecting lockfile>=0.10 (from python-daemon->luigi)
  Downloading lockfile-0.12.2-py2.py3-none-any.whl (13 kB)
Building wheels for collected packages: luigi
  Building wheel for luigi (setup.py) ... [?25l[?25hdone
  Created wheel for luigi: filename=luigi-3.3.0-py3-none-any.whl size=1085292 sha256=4107fdb247cf9fd261e7ec516cb9f1d938599e0262feed406c2b87278a7e96c4
  Stored in directory: /root/.cache/pip/wheels/1b/3b/d5/c999c34bd8478e559f006b83333be40ddf5fab360cf2c6f720
Successfully built luigi
Installing collected packages: lockfile, python-daemon, luigi
Succe

In [2]:
import urllib.request
import os
import tarfile
import gzip
import luigi
import io
import pandas as pd

In [3]:
BASE_URL = "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE68nnn/GSE68849/suppl/"
RAW_TAR = "GSE68849_RAW.tar"

In [4]:
class DownloadRawData(luigi.Task):
    dataset_name = luigi.Parameter()
    base_url = BASE_URL

    def output(self):
        return luigi.LocalTarget(os.path.join('data/', self.dataset_name, RAW_TAR))

    def run(self):
        os.makedirs(os.path.join('data/', self.dataset_name), exist_ok=True)
        urllib.request.urlretrieve(os.path.join(self.base_url, RAW_TAR), os.path.join('data/', self.dataset_name, RAW_TAR))

In [5]:
class ExtractGzFiles(luigi.Task):
    dataset_name= luigi.Parameter()

    def requires(self):
        return DownloadRawData(dataset_name=self.dataset_name)

    def output(self):
        return luigi.LocalTarget(os.path.join('data/', self.dataset_name, 'gz_files'))

    def run(self):
        raw_tar_path = os.path.join('data/', self.dataset_name, RAW_TAR)
        extracted_folder_path = os.path.join('data/', self.dataset_name, 'gz_files')
        os.makedirs(extracted_folder_path, exist_ok=True)

        with tarfile.open(raw_tar_path, 'r') as tar:
            tar.extractall(path=extracted_folder_path)

In [9]:
class ExtractTxt(luigi.Task):
    dataset_name= luigi.Parameter()

    def requires(self):
        return ExtractGzFiles(dataset_name=self.dataset_name)

    def output(self):
        return luigi.LocalTarget(os.path.join('data/', self.dataset_name, 'txt_files'))

    def run(self):
        extracted_folder_path = os.path.join('data/', self.dataset_name, 'gz_files')
        txt_files_path = os.path.join('data/', self.dataset_name, 'txt_files')
        os.makedirs(txt_files_path, exist_ok=True)

        for root, dirs, files in os.walk(extracted_folder_path):
            for file in files:
                if file.endswith(".gz"):
                    gz_path = os.path.join(root, file)
                    with gzip.open(gz_path, 'rt') as f:
                        content = f.read()
                        tables = content.split("\n\n")
                        for i, table in enumerate(tables):
                            table_path = os.path.join(txt_files_path, f"{file[:-7]}_{i+1}.txt")
                            with open(table_path, "w") as txt_file:
                                txt_file.write(table)


In [10]:
class SplitTables(luigi.Task):
    dataset_name = luigi.Parameter()
    
    def requires(self):
        return ExtractTxt(self.dataset_name)
    
    def output(self):
        return luigi.LocalTarget(f'data/{self.dataset_name}/tables/Probes_reduced.tsv')
    
    def run(self):
        extracted_folder_path = os.path.join('data/', self.dataset_name, 'txt_files')
        
        # Organize the files into subdirectories
        os.makedirs(f'data/{self.dataset_name}/tables', exist_ok=True)
        dfs = {}
        for file in os.listdir(extracted_folder_path):
            file_path = os.path.join(extracted_folder_path, file)
            if file.endswith('.txt'):
                with open(file_path) as f:
                    write_key = None
                    fio = io.StringIO()
                    for l in f.readlines():
                        if l.startswith('['):
                            if write_key:
                                fio.seek(0)
                                header = None if write_key == 'Heading' else 'infer'
                                df = pd.read_csv(fio, sep='\t', header=header)
                                dfs[write_key] = df
                                df.to_csv(f'data/{self.dataset_name}/tables/{write_key}.tsv', sep='\t', index=False)
                            fio = io.StringIO()
                            write_key = l.strip('[]\n')
                            continue
                        if write_key:
                            fio.write(l)
                    fio.seek(0)
                    df = pd.read_csv(fio, sep='\t')
                    dfs[write_key] = df
                    df.to_csv(f'data/{self.dataset_name}/tables/{write_key}.tsv', sep='\t', index=False)

        probes_df = dfs['Probes']
        probes_df.drop(['Definition', 'Ontology_Component', 'Ontology_Process', 'Ontology_Function', 'Synonyms', 'Obsolete_Probe_Id', 'Probe_Sequence'], axis=1, inplace=True)
        probes_df.to_csv(f'data/{self.dataset_name}/tables/Probes_reduced.tsv', sep='\t', index=False)


In [12]:
if __name__ == '__main__':
    # запускаем задачу на обработку таблицы Probes
    luigi.build([SplitTables(dataset_name='GSE68849')], local_scheduler=True)

DEBUG: Checking if SplitTables(dataset_name=GSE68849) is complete
DEBUG:luigi-interface:Checking if SplitTables(dataset_name=GSE68849) is complete
DEBUG: Checking if ExtractTxt(dataset_name=GSE68849) is complete
DEBUG:luigi-interface:Checking if ExtractTxt(dataset_name=GSE68849) is complete
INFO: Informed scheduler that task   SplitTables_GSE68849_e34547c091   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   SplitTables_GSE68849_e34547c091   has status   PENDING
DEBUG: Checking if ExtractGzFiles(dataset_name=GSE68849) is complete
DEBUG:luigi-interface:Checking if ExtractGzFiles(dataset_name=GSE68849) is complete
INFO: Informed scheduler that task   ExtractTxt_GSE68849_e34547c091   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   ExtractTxt_GSE68849_e34547c091   has status   PENDING
INFO: Informed scheduler that task   ExtractGzFiles_GSE68849_e34547c091   has status   DONE
INFO:luigi-interface:Informed scheduler that task   ExtractGzFiles_