In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
    .appName('DNABaseCount')\
    .config("spark.driver.bindAddress", "127.0.0.1")\
    .getOrCreate()

### 1. Create an RDD[String] from the Input

In [3]:
input_path = "fasta_example.txt"

In [4]:
records_rdd = records_rdd = spark.sparkContext.textFile(input_path)

In [5]:
records_rdd.collect()


['>Seq1 [organism=Carpodacus mexicanus] [clone=6b] actin (act) mRNA, partial cds',
 'CCTTTATCTAATCTTTGGAGCATGAGCTGGCATAGTTGGAACCGCCCTCAGCCTCCTCATCCGTGCAGAA',
 'TAATAATTTTCTTTATAGTAATACCAATCATGATCGGTGGTTTCGGAAACTGACTAGTCCCACTCATAAT',
 '>Seq2 [organism=uncultured bacillus sp.] [isolate=A2] corticotropin (CT) gene, complete cds',
 'GGTAGGTACCGCCCTAAGNCTCCTAATCCGAGCAGAACTANGCCAACCCGGAGCCCTTCTGGGAGACGAC',
 'TCAACACCACCTTCTTTGACCCAGCAGGAGGAGGAGACCCAGTACTATACCAGCACCTATTCTGATTCTT',
 '>Seq3 [organism=Phalaenopsis equestris var. leucaspis]',
 'CCTATACCTAATTTTCGGCGCATGAGCCGGAATGGTGGGTACCGCTCTAAGCCTCCTCATTCGAGCAGAA',
 'CTAGGCCAACCCGGAGCCCTTCTGGGAGACGACCAAGTCTACAACGTGGTTGTCACGGCCCATGCCTTCG',
 '>Seq9 [organism=Petunia integrifolia subsp. inflata]',
 'TAGTTGGAACAGCCCTCAGCCTACTCATCCGAGCAGAACTAGGCCAACCCGGAACCCTCCTGGGAGATGA',
 'CCAAATCTACAATGTAATCGTCACTGCCCATGCCTTCGTAATAATCTTCTTCATAGTAATACCAGTCATA']

### 2. Define a Mapper Function

In [6]:
# Parameter: fasta_record: String (a single FASTA record)
#
# Output: a list of (key, value) pairs, where key
# is a dna_letter and value is a frequency
def process_FASTA_record(fasta_record):
    key_value_list = []
    
    if (fasta_record.startswith(">")):
        # z counts the number of FASTA sequences
        key_value_list.append(("seq", 1))
    else:
        chars = fasta_record.lower()
        for c in chars:
            key_value_list.append((c, 1))

    print(key_value_list)
    return key_value_list

In [7]:
pairs_rdd = records_rdd.flatMap(lambda rec: process_FASTA_record(rec))

In [8]:
pairs_rdd.take(5)

[('seq', 1), ('c', 1), ('c', 1), ('t', 1), ('t', 1)]

### 3. Find the Frequencies of DNA Letters

In [9]:
frequencies_rdd = pairs_rdd.reduceByKey(lambda x, y: x+y)

In [10]:
frequencies_rdd.collectAsMap()

{'seq': 4, 'c': 165, 'g': 115, 't': 134, 'a': 144, 'n': 2}