In [1]:
from pyspark.sql import SparkSession
import time

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://group101:7077") \
        .appName("Average_protein")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
# Start taking time
start_time = time.time()

In [3]:
# function that 
def count_occur(str):
   uniq = set(str)
   li = list(str)
   dict = {}
   for key in uniq:
       dict[key] = str.count(key)
   return dict

In [4]:
def count_all(str):
   li = list(str)
   length = len(li)
   return length

In [5]:
def add(dict1, dict2):
    result = {key: dict1.get(key, 0) + dict2.get(key, 0)
          for key in set(dict1) | set(dict2)}
    return result

In [6]:
fasta_file = spark_context.textFile('file:///home/ubuntu/data/pdb_seqres4.txt')

In [7]:
# Split the fasta file into headers and amino acid sequences
headers = fasta_file.filter(lambda x: ">" in x)
sequences = fasta_file.filter(lambda x: ">" not in x)

In [8]:
headers_indexed = headers.zipWithIndex()
sequences_indexed = sequences.zipWithIndex()

In [9]:
sequences_indexed_inverted = sequences_indexed.map(lambda x: (x[1], x[0]))
headers_indexed_inverted = headers_indexed.map(lambda x: (x[1], x[0]))

In [10]:
# Joining the sequences by their indexes
joined_fasta = sequences_indexed_inverted.join(headers_indexed_inverted)
# Filter out all non-proteins
prot_sequences = joined_fasta.filter(lambda x: ("protein" in x[1][1]))

In [11]:
# Counting the occurence of each character in each protein
ind_dict = prot_sequences.map(lambda x: count_occur(x[1][0]))
# Taking a look at two first proteins
# ind_dict.take(2)

In [12]:
sum_lengths = prot_sequences.map(lambda x: count_all(x[1][0])).sum()
all_aa_occ = ind_dict.reduce(add)
result = {key: (all_aa_occ.get(key, 0))/sum_lengths
          for key in set(all_aa_occ)}
print(result)

{'P': 0.046321832081362375, 'K': 0.05960191786714657, 'C': 0.013337454860752107, 'D': 0.05555970162922566, 'A': 0.07992560654223138, 'I': 0.05570616612050634, 'R': 0.05269718196823905, 'L': 0.08940539061097663, 'E': 0.06572919486526313, 'H': 0.026479981958879073, 'Z': 5.046310552642776e-07, 'S': 0.06247436193888671, 'M': 0.023248343371005724, 'W': 0.013170552811733217, 'Y': 0.03393936345763929, 'F': 0.03854759818419542, 'Q': 0.037628506167226954, 'G': 0.07387441669322521, 'V': 0.07016072461730088, 'U': 1.5325832048766948e-06, 'N': 0.04170209363575013, 'B': 3.0838564488372514e-07, 'O': 1.869003908386213e-08, 'X': 0.004952608041695832, 'T': 0.0555346382868142}


In [13]:
execution_time = time.time() - start_time

In [14]:
print(execution_time)

168.09951519966125


In [15]:
# release the cores for another application!
spark_context.stop()