In [11]:
import os

def process_file(input_file, format_type="ftdna"):
    output_lines = ["RSID,CHROMOSOME,POSITION,RESULT\n"]  
    
    with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile:
        if format_type == "ftdna":
            for line in infile:
                if line.strip() and not line.startswith("__") and not line.startswith("||||") and not line.startswith("#"):
                    output_lines.append(line)
                    
        elif format_type == "ancestry" or format_type == "23andme":
            for line in infile:
                if line.startswith("#"):
                    continue  
                
                fields = line.strip().split("\t")
                if len(fields) == 4:
                    rsid, chrom, pos, result = fields
                    output_lines.append(f"{rsid},{chrom},{pos},{result}\n")
    
    return output_lines


def detect_format(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
            first_line = file.readline()
            
            if "ftdna" in first_line.lower() or "RSID" in first_line:
                return "ftdna"
            elif "AncestryDNA" in first_line:
                return "ancestry"
            elif "23andMe" in first_line:
                return "23andme"
            else:
                
                for _ in range(10):  
                    line = file.readline()
                    if "ftdna" in line.lower():
                        return "ftdna"
                    elif "ancestry" in line.lower():
                        return "ancestry"
                    elif "23andme" in line.lower():
                        return "23andme"
                return None
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        return None


def process_all_files(input_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    
    for file_name in os.listdir(input_folder):
        file_path = os.path.join(input_folder, file_name)
        
        file_format = detect_format(file_path)
        
        if file_format:
            processed_data = process_file(file_path, file_format)
            
            output_file_path = os.path.join(output_folder, f"{file_name}_processed.csv")
            with open(output_file_path, 'w', encoding='utf-8') as outfile:
                outfile.writelines(processed_data)
            
            print(f"Processed and saved: {file_name} -> {output_file_path}")
        else:
            print(f"Skipped (unknown format): {file_name}")


input_folder = './dnaSamples'  
output_folder = './processedSamples'  

process_all_files(input_folder, output_folder)


Processed and saved: 12724.23andme.10529 -> ./processedSamples/12724.23andme.10529_processed.csv
Processed and saved: 12755.23andme.10564 -> ./processedSamples/12755.23andme.10564_processed.csv
Processed and saved: 12725.ancestry.10530 -> ./processedSamples/12725.ancestry.10530_processed.csv
Processed and saved: 12701.23andme.10512 -> ./processedSamples/12701.23andme.10512_processed.csv
Skipped (unknown format): 12696.23andme.10507
Processed and saved: 12746.23andme.10554 -> ./processedSamples/12746.23andme.10554_processed.csv
Skipped (unknown format): .DS_Store
Processed and saved: 12731.ancestry.10536 -> ./processedSamples/12731.ancestry.10536_processed.csv
Processed and saved: 12752.23andme.10562 -> ./processedSamples/12752.23andme.10562_processed.csv
Processed and saved: 12673.ancestry.10482 -> ./processedSamples/12673.ancestry.10482_processed.csv
Processed and saved: 12669.ancestry.10477 -> ./processedSamples/12669.ancestry.10477_processed.csv
Processed and saved: 12726.23andme.10

In [14]:
import pandas as pd

def load_snp_data(input_folder):
    all_snp_data = []  
    
    for file_name in os.listdir(input_folder):
        if file_name.endswith('_processed.csv'):
            file_path = os.path.join(input_folder, file_name)
            
            df = pd.read_csv(file_path)
            df['Sample'] = file_name  
            all_snp_data.append(df)
    
    snp_data = pd.concat(all_snp_data, ignore_index=True)
    return snp_data

processed_folder = './processedSamples'

snp_data = load_snp_data(processed_folder)
snp_data.head()  


  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)


Unnamed: 0,RSID,CHROMOSOME,POSITION,RESULT,Sample
0,rs12564807,1,734462,AA,12755.23andme.10564_processed.csv
1,rs3131972,1,752721,AG,12755.23andme.10564_processed.csv
2,rs148828841,1,760998,AC,12755.23andme.10564_processed.csv
3,rs12124819,1,776546,AG,12755.23andme.10564_processed.csv
4,rs115093905,1,787173,GG,12755.23andme.10564_processed.csv


In [17]:

def calculate_snp_percentage(snp_data, rsid):
    
    snp_subset = snp_data[snp_data['RSID'] == rsid]
    
    total_samples = snp_data['Sample'].nunique()
    
    samples_with_snp = snp_subset['Sample'].nunique()
    
    if total_samples > 0:
        percentage = (samples_with_snp / total_samples) * 100
    else:
        percentage = 0.0  # To handle cases where there are no samples
    
    return percentage

rsid = input("Enter the RSID for analysis: ")

percentage = calculate_snp_percentage(snp_data, rsid)

if percentage > 0:
    print(f"Percentage of samples containing SNP {rsid}: {percentage:.2f}%")
else:
    print(f"The SNP {rsid} was not found in any sample.")


Percentage of samples containing SNP rs7853989: 87.50%


In [16]:

def calculate_all_snp_frequencies(snp_data):
    snp_counts = snp_data.groupby('RSID')['Sample'].nunique().reset_index()
    
    total_samples = snp_data['Sample'].nunique()
    
    snp_counts['Percentage'] = (snp_counts['Sample'] / total_samples) * 100
    
    snp_counts = snp_counts.sort_values(by='Percentage', ascending=False)
    return snp_counts

snp_frequencies = calculate_all_snp_frequencies(snp_data)
snp_frequencies.head()  


Unnamed: 0,RSID,Sample,Percentage
163275,rs1076438,24,100.0
1621666,rs6954727,24,100.0
1326276,rs4563166,24,100.0
1832087,rs7660199,24,100.0
484722,rs12424442,24,100.0
