In [14]:
import pandas as pd
import re

class VCFBase:
    """
    базовый класс для работы с vcf файлами
    содержит общую функциональность для парсинга и анализа
    """

    def __init__(self, vcf_file):
        """
        инициализация базового класса
        vcf_file: путь к vcf файлу
        """
        self.vcf_file = vcf_file
        self.metadata = {}
        self.header = []
        self.samples = []
        self.variants_df = None



    def _parse_metadata(self, metadata_lines):
        """
        парсинг метаданных vcf файла
        :))))re.match проверяет, соответствует ли начало строки заданному регулярному выражению

        metadata_lines: список строк метаданных
        """
        self.metadata = {'fileformat': '', 'info': {}, 'format': {}, 'filter': {}, 'other': {}}

        for line in metadata_lines:
            if line.startswith('##fileformat='):
                self.metadata['fileformat'] = line.split('=', 1)[1]
            elif line.startswith('##INFO='):
                match = re.match(r'##INFO=<ID=(\w+),.*,Description="([^"]*)">', line, re.IGNORECASE)
                if match:
                    self.metadata['info'][match.group(1)] = match.group(2)
            elif line.startswith('##FORMAT='):
                match = re.match(r'##FORMAT=<ID=(\w+),.*,Description="([^"]*)">', line, re.IGNORECASE)
                if match:
                    self.metadata['format'][match.group(1)] = match.group(2)
            elif line.startswith('##FILTER='):
                match = re.match(r'##FILTER=<ID=(\w+),.*,Description="([^"]*)">', line, re.IGNORECASE)
                if match:
                    self.metadata['filter'][match.group(1)] = match.group(2)
            elif '=' in line[2:]:
                key, value = line[2:].split('=', 1)
                self.metadata['other'][key] = value

    def _parse_info_field(self, info_str):
        """
        парсинг инф поля в словарь
        info_str строка info поля

        возврат словарь с распарсенными значениями
        """
        info_dict = {}
        if info_str != '.':
            for item in info_str.split(';'):
                if '=' in item:
                    key, value = item.split('=', 1)
                    try:
                        if '.' in value:
                            value = float(value)
                        else:
                            value = int(value)
                    except:
                        pass
                    info_dict[key.lower()] = value
                else:
                    info_dict[item.lower()] = True
        return info_dict


class VCFReader(VCFBase):
    """
    класс для чтения vcf файлов
    !!!!!!!!!!!!! наследует базовую функциональность от VCFBase
    """

    def parse_file(self):
        """
        чтение и парсинг vcf файла
        создает pandas dataframe с вариантами
        """
        print(f"чтение vcf файла: {self.vcf_file}")

        metadata_lines = []
        header_line = ""
        variant_lines = []

        with open(self.vcf_file, 'r') as f:
            for line in f:
                line = line.strip()
                if line.startswith('##'):
                    metadata_lines.append(line)
                elif line.startswith('#CHROM'):
                    header_line = line
                else:
                    variant_lines.append(line)

        self._parse_metadata(metadata_lines)
        self._parse_header(header_line)

        if variant_lines:
            self.variants_df = self._create_dataframe(variant_lines)
            print(f"общее кол-во вариантов {len(self.variants_df)} ")
        else:
            print("вариантов нет")

    def _parse_header(self, header_line):
        """
        парсинг строки заголовка vcf

        header_line строка заголовка
        """
        if header_line:
           self.header = header_line[1:].split('\t')
           if len(self.header) > 9:
                self.samples = self.header[9:]
                print(f"кол-во образцов {len(self.samples)}")

    def _create_dataframe(self, variant_lines):
        """
        создание pandas dataframe из строк вариантов

        variant_lines: список строк с вариантами

        вывод pandas dataframe с вариантами
        """
        data = []
        for line in variant_lines:
            fields = line.split('\t')
            if len(fields) >= 8:
                variant_data = {
                    'chrom': fields[0],
                    'pos': int(fields[1]),
                    'id': fields[2],
                    'ref': fields[3],
                    'alt': fields[4],
                    'qual': self._parse_qual(fields[5]),
                    'filter': fields[6],
                    'info': fields[7]
                }

                # добавляем информацию из info поля
                info_dict = self._parse_info_field(fields[7])
                variant_data.update(info_dict)

                data.append(variant_data)

        return pd.DataFrame(data)

    def _parse_qual(self, qual_str):
        """
        парсинг поля качества

        qual_str строка с качеством

        вывод числовое значение качества или none
        """
        if qual_str != '.':
            try:
                return float(qual_str)
            except:
                return None
        return None


class VCFAnalyzer(VCFReader):
    """
    класс для анализа vcf файлов
    !!!!!! наследует функциональность чтения от VCFReader
    добавляет методы анализа
    """

    def get_header_info(self, group=None):
        """
        получение информации о заголовке и метаданных


        group конкретная группа метаданных ('info', 'format', 'filter')

        вывод словарь с информацией о запрошенной группе
        """
        if not self.metadata:
            print("метаданных нет")
            return {}

        if group:
            return self.metadata.get(group.lower(), {})
        else:
            return {
                'fileformat': self.metadata['fileformat'],
                'samples': self.samples,
                'header_columns': self.header
            }

    def get_variant_count(self):
        """
        получение общего количества вариантов

        вывод количество вариантов в файле
        """
        if self.variants_df is None:
            print("данные не загружены")
            return 0

        count = len(self.variants_df)
        print(f"количество вариантов: {count}")
        return count

    def get_alignment_statistics(self, region=None):
        """
        получение статистики по выравниваниям в регионах

         region конкретная хромосома для фильтрации

        вывод pandas dataframe со статистикой
        """
        if self.variants_df is None:
            print("данные не загружены")
            return pd.DataFrame()

        # полиморфизм - разное поведение в зависимости от региона
        if region:
            data = self.variants_df[self.variants_df['chrom'] == region]
            if data.empty:
                print(f"не найдено вариантов для региона {region}")
                return pd.DataFrame()
        else:
            data = self.variants_df

        # группируем по хромосоме и считаем статистику
        stats = data.groupby('chrom').agg({
            'pos': 'count',  # количество вариантов
            'dp': ['mean', 'median'] if 'dp' in data.columns else 'count'
        }).round(2)




        if 'dp' in data.columns:
            stats.columns = ['variant_count', 'mean_dp', 'median_dp']
        else:
            stats.columns = ['variant_count']

        return stats.reset_index()

    def get_variants_in_region(self, chrom, start, end):
        """
        поиск вариантов в указанном геномном регионе

        chrom хромосома
        start начальная позиция
        end конечная позиция

        вывод pandas dataframe с вариантами в регионе
        """
        if self.variants_df is None:
            print("данные не загружены")
            return pd.DataFrame()

        # фильтруем варианты по позиции
        variants_in_region = self.variants_df[
            (self.variants_df['chrom'] == chrom) &
            (self.variants_df['pos'] >= start) &
            (self.variants_df['pos'] <= end)
        ]

        print(f"найдено вариантов в регионе {chrom}:{start}-{end}: {len(variants_in_region)}")
        return variants_in_region

    def get_variant_type_stats(self):
        """
        получение статистики по типам вариантов

        вывод словарь с количеством вариантов по типам
        """
        if self.variants_df is None:
            print("данных нет")
            return {}

        stats = {'snp': 0, 'indel': 0, 'other': 0}

        for _, variant in self.variants_df.iterrows():
            ref_len = len(variant['ref'])
            alt_len = len(variant['alt'])

            if ref_len == 1 and alt_len == 1:
                stats['snp'] += 1
            elif ref_len != alt_len:
                stats['indel'] += 1
            else:
                stats['other'] += 1

        return stats


class VCFReportGenerator(VCFAnalyzer):
    """
    класс для генерации отчетов по vcf файлам
    !!!!! наследует всю функциональность анализа
    добавляет методы генерации отчетов
    """

    def generate_summary_report(self):
        """
        генерация сводного отчета по vcf файлу

        вывод словарь с общей статистикой
        """
        if self.variants_df is None:
            print("данных нет")
            return {}


        """
        unique() — возвращает массив уникальных значений из Series, сохраняя порядок их появления
        tolist() — преобразует объект в список
        to_dict() — преобразует DataFrame в словарь
        """


        report = {
            'общее_количество_вариантов': len(self.variants_df),
            'хромосомы': self.variants_df['chrom'].unique().tolist(),

            'типы_вариантов': self.get_variant_type_stats(),
            'статистика_по_регионам': self.get_alignment_statistics().to_dict('records')
        }

        # добавляем статистику качества если есть
        if 'qual' in self.variants_df.columns and self.variants_df['qual'].notna().any():
            report['качество'] = {
                'среднее': round(self.variants_df['qual'].mean(), 2),
                'медиана': round(self.variants_df['qual'].median(), 2)
            }

        # добавляем статистику глубины если есть
        if 'dp' in self.variants_df.columns and self.variants_df['dp'].notna().any():
            report['глубина_покрытия'] = {
                'средняя': round(self.variants_df['dp'].mean(), 2),
                'медиана': round(self.variants_df['dp'].median(), 2)
            }

        return report

    def save_variants_to_file(self, variants_df, output_file):
        """
        сохранение вариантов в файл

        variants_dz dataframe с вариантами
        output_file путь для сохранения
        """
        if variants_df.empty:
            print("нет данных")
            return

        variants_df.to_csv(output_file, sep='\t', index=False) #пересохранение



def main():



    vcf_file = "/content/S1.haplotypecaller.filtered.phased.csq.vcf"
    analyzer = VCFReportGenerator(vcf_file)
    analyzer.parse_file()

    if analyzer.variants_df is None:
        print("нет данных")
        return

    # заголовок
    print("заголовок")
    header_info = analyzer.get_header_info()
    print(f"формат: {header_info.get('fileformat', )}")
    print(f"образцы: {header_info.get('samples', [])}")

    # количество вариантов
    print("количество вариантов:")
    count = analyzer.get_variant_count()

    # статистика по выравниваниям
    print("статистика по выравниваниям")
    stats = analyzer.get_alignment_statistics()
    if not stats.empty:
        print(stats.to_string(index=False))

    # поиск в регионе
    print("поиск вариантов в регионе")

    region_variants = analyzer.get_variants_in_region("chr1", 69270, 200000) # ЭТО МОЖНО МЕНЯТЬ !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    if not region_variants.empty:
        print("первые 5 вариантов:")
        cols = ['chrom', 'pos', 'ref', 'alt', 'qual']
        available_cols = [c for c in cols if c in region_variants.columns]
        print(region_variants[available_cols].head().to_string(index=False))

    report = analyzer.generate_summary_report()
    print(f"хромосомы {report.get('хромосомы', [])}")
    print(f"типы вариантов {report.get('типы_вариантов', {})}")


if __name__ == "__main__":
    main()

чтение vcf файла: /content/S1.haplotypecaller.filtered.phased.csq.vcf
кол-во образцов 1
общее кол-во вариантов 30318 
заголовок
формат: VCFv4.2
образцы: ['ERR031940_S1']
количество вариантов:
количество вариантов: 30318
статистика по выравниваниям
           chrom  variant_count  mean_dp  median_dp
            chr1           3375    85.97       70.0
           chr10           1333    96.28       80.0
           chr11           2044    94.49       76.0
           chr12           1695    98.06       84.0
           chr13            572   118.40      108.0
           chr14            882    91.03       78.5
           chr15            843    77.99       69.0
           chr16           1153    63.82       46.0
           chr17           1817    69.24       52.0
           chr18            449   100.68       93.0
           chr19           2270    73.45       51.0
            chr2           1832    92.44       81.0
           chr20            857    78.08       60.0
           chr21        