In [1]:
from pyspark.sql import SparkSession
from json import loads

from seq_utils import quality_adjusted_smith_waterman, substitution_matrix, gap_penalty
from operator import add

from time import time

In [2]:
# Initialize Spark session and context
spark = SparkSession.builder.master("local[1]").appName("SparkBlast.com").getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Function to create the initial combiner list
def createCombiner(v):
    return [v]

# Function to merge a new value into the current combiner list
def mergeValue(c, v):
    if c[0][1] > v[1]:  # If the current best score is higher, keep the current best
        return c
    elif c[0][1] == v[1]:  # If the scores are equal, append the new alignment
        c.append(v)
        return c
    else:  # If the new score is higher, use the new value
        return [v]

# Function to merge two combiner lists
def mergeCombiners(a, b):
    if a[0][1] > b[0][1]:  # If the best score in 'a' is higher, keep 'a'
        return a
    elif a[0][1] < b[0][1]:  # If the best score in 'b' is higher, use 'b'
        return b
    else:  # If the scores are equal, extend 'a' with 'b'
        return a.extend(b)

For 10f x 10q func took: 1.4822 sec

For 100f x 100q func took: 281.5245 sec, 242.1444 sec  
[('ENST00000390360.3', 4), ('ENST00000634605.1', 1), ('ENST00000634176.1', 2), ('ENST00000390392.3', 11), ('ENST00000390424.2', 4), ('ENST00000390387.3', 11), ('ENST00000390425.2', 5), ('ENST00000390357.3', 9), ('ENST00000390427.3', 6), ('ENST00000634383.1', 6), ('ENST00000634111.1', 6), ('ENST00000390363.2', 2), ('ENST00000390426.2', 2), ('ENST00000390423.2', 3), ('ENST00000542354.1', 1), ('ENST00000390361.3', 2), ('ENST00000390469.2', 2), ('ENST00000455382.2', 3), ('ENST00000631824.1', 4), ('ENST00000390362.1', 1), ('ENST00000632187.1', 3), ('ENST00000535880.2', 1), ('ENST00000390381.3', 3), ('ENST00000632248.1', 3), ('ENST00000633265.1', 1), ('ENST00000390353.2', 1), ('ENST00000547918.2', 1), ('ENST00000632308.1', 1), ('ENST00000633313.1', 1)]

In [4]:
# Start timer to measure the execution time
ts = time()
# Read cDNA sequences and scRNA sequencing sequences from HDFS and parse each line as JSON, creating an RDD
cdnas_rdd = sc.textFile("hdfs://localhost:9000/bigdata/cdnas_100").map(lambda x: loads(x))
sequences_rdd = sc.textFile("hdfs://localhost:9000/bigdata/sequences_100").map(lambda x: loads(x))

# Perform Cartesian product to get all possible pairs of cDNAs and sequences
temp = cdnas_rdd.cartesian(sequences_rdd) \
    .map(lambda seq: (seq[1]['barcode'], (seq[0]['transcript_id'], *quality_adjusted_smith_waterman(seq[1]['sequence'], seq[1]['quality'], seq[0]['sequence'], substitution_matrix, gap_penalty)))) \
    .combineByKey(createCombiner, mergeValue, mergeCombiners) \
    .flatMap(lambda x: [(y[0],1) for y in x[1]]) \
    .reduceByKey(add) \
    .collect()

# End timer to measure the execution time
te = time()
# Print the time taken for execution
print('func took: %2.4f sec' % (te-ts))
# Print the results
print(temp)

[Stage 0:>                                                          (0 + 1) / 1]

func took: 242.1444 sec
[('ENST00000390360.3', 4), ('ENST00000634605.1', 1), ('ENST00000634176.1', 2), ('ENST00000390392.3', 11), ('ENST00000390424.2', 4), ('ENST00000390387.3', 11), ('ENST00000390425.2', 5), ('ENST00000390357.3', 9), ('ENST00000390427.3', 6), ('ENST00000634383.1', 6), ('ENST00000634111.1', 6), ('ENST00000390363.2', 2), ('ENST00000390426.2', 2), ('ENST00000390423.2', 3), ('ENST00000542354.1', 1), ('ENST00000390361.3', 2), ('ENST00000390469.2', 2), ('ENST00000455382.2', 3), ('ENST00000631824.1', 4), ('ENST00000390362.1', 1), ('ENST00000632187.1', 3), ('ENST00000535880.2', 1), ('ENST00000390381.3', 3), ('ENST00000632248.1', 3), ('ENST00000633265.1', 1), ('ENST00000390353.2', 1), ('ENST00000547918.2', 1), ('ENST00000632308.1', 1), ('ENST00000633313.1', 1)]


                                                                                

In [None]:
cdnas_rdd = sc.textFile("hdfs://localhost:9000/bigdata/cdnas_100").map(lambda x: loads(x))
sequences_rdd = sc.textFile("hdfs://localhost:9000/bigdata/sequences_100").map(lambda x: loads(x))
cdnas_rdd.take(2)
sequences_rdd.take(2)
alignment_rdd = cdnas_rdd.cartesian(sequences_rdd) \
    .map(lambda seq: (seq[1]['barcode'], (seq[0]['transcript_id'], *quality_adjusted_smith_waterman(seq[1]['sequence'], seq[1]['quality'], seq[0]['sequence'], substitution_matrix, gap_penalty))))
alignment_rdd.take(2)
results_rdd = results_rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
results_rdd.take(2)
test_rdd.flatMap(lambda x: [(y[0],1) for y in x[1]]).reduceByKey(add).collect()

In [5]:
# Display the first 2 records from the cDNAs RDD
cdnas_rdd.take(2)

[{'transcript_id': 'ENST00000415118.1',
  'attributes': {'transcript_id': 'ENST00000415118.1',
   'seqtype': 'cdna',
   'chromosome': 'GRCh38:14:22438547:22438554:1 ',
   'gene': 'ENSG00000223997.1 ',
   'gene_biotype': 'TR_D_gene ',
   'transcript_biotype': 'TR_D_gene ',
   'gene_symbol': 'TRDD1 ',
   'description': 'T cell receptor delta diversity 1 [Source:HGNC Symbol;Acc:HGNC:12254]'},
  'sequence': 'GAAATAGT'},
 {'transcript_id': 'ENST00000448914.1',
  'attributes': {'transcript_id': 'ENST00000448914.1',
   'seqtype': 'cdna',
   'chromosome': 'GRCh38:14:22449113:22449125:1 ',
   'gene': 'ENSG00000228985.1 ',
   'gene_biotype': 'TR_D_gene ',
   'transcript_biotype': 'TR_D_gene ',
   'gene_symbol': 'TRDD3 ',
   'description': 'T cell receptor delta diversity 3 [Source:HGNC Symbol;Acc:HGNC:12256]'},
  'sequence': 'ACTGGGGGATACG'}]

In [6]:
# Display the first 2 records from the scRNA-sequencing sequences RDD
sequences_rdd.take(2)

[{'barcode': 'GNGGGTAAGTGTTGAAGGATTTTTATGG',
  'attributes': {'instrument': 'A00519',
   'run_number': '986',
   'flowcell_ID': 'HFFLJDSX2',
   'lane': '4',
   'tile': '1101',
   'x_pos': '15510',
   'y_pos': '1016',
   'read': '2',
   'is_filtered': 'N',
   'control number': '0',
   'i7_index': 'GAGACGCACG',
   'i5_index': 'ATGTTCATAG'},
  'sequence': 'AAGCAGTGGTATCAACGCAGAGTACATGGGGCTATGCTGGAGGTTTATATGAAATACAGATAAAGCACACAAAGATTGAGTCTATGACAA',
  'quality': 'F:FFF:,,FFF,FFFFFF:,FF:FF:FF,FFF,F,F,F:FFFF:FFF:,,F:F:FFFFFFF:FFFFFFFFF,FF:,:FFFFFF,FF:FF:',
  'bc_quality': 'F#F:FFF,:,FF,FF,FFFFFF,F::,:'},
 {'barcode': 'ANTGACTTCGTTCCTGTAAACTGATTCT',
  'attributes': {'instrument': 'A00519',
   'run_number': '986',
   'flowcell_ID': 'HFFLJDSX2',
   'lane': '4',
   'tile': '1101',
   'x_pos': '16089',
   'y_pos': '1016',
   'read': '2',
   'is_filtered': 'N',
   'control number': '0',
   'i7_index': 'GAGACGCACG',
   'i5_index': 'ATGTTCATAG'},
  'sequence': 'GGGGGATAGAAAAGAAATAAGCAGGCCAGGCTCAGTGGC

In [8]:
# Perform Cartesian product to get all possible pairs of cDNAs and sequences
# And apply the quality-adjusted Smith-Waterman algorithm to each pair, yielding the barcode and alignment result.
alignment_rdd = cdnas_rdd.cartesian(sequences_rdd) \
    .map(lambda seq: (seq[1]['barcode'], (seq[0]['transcript_id'], *quality_adjusted_smith_waterman(seq[1]['sequence'], seq[1]['quality'], seq[0]['sequence'], substitution_matrix, gap_penalty))))
alignment_rdd.take(2) 

[('GNGGGTAAGTGTTGAAGGATTTTTATGG',
  ('ENST00000415118.1', 9.9, 'GAAATA', 'GAAATA')),
 ('ANTGACTTCGTTCCTGTAAACTGATTCT',
  ('ENST00000415118.1', 11.1, 'GAAATA', 'GAAATA'))]

In [10]:
# Use combineByKey to aggregate alignments by barcode
results_rdd = results_rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
results_rdd.take(2)

                                                                                

[('GNGGGTAAGTGTTGAAGGATTTTTATGG',
  [('ENST00000390360.3',
    51.275000000000055,
    'AAGCAGTGGTAT-CA-ACG-CAGAGTACATGGGGCT-ATGCTGGAGGTTTATATGAAATACAG-ATAAAGCAC--ACAAAGATTGAGTCTATGA',
    'ATGTACTGGTATAGACAAGATCTAGGAC-TGGGGCTAAGGCTCATCCATTAT-TCAAATACTGCAGGTACCACTGGCAAAGGAGAAGTCCCTGA')]),
 ('ANTGACTTCGTTCCTGTAAACTGATTCT',
  [('ENST00000634605.1',
    59.5500000000001,
    'GGGGATAGAAAAGAAATAAGCAGGCCAG-GCTCA-GTG-GCT-C-A-TGCCTGTAATCCTAGCATTT--TGGGA--GGCTGAGGCAGCAGAACTGCCTG',
    'GGTCACAGAGAAGGGA-AAGGATG-TAGAGCTCAGGTGTGATCCAATTTCAGGTCATACTGCCCTTTACTGGTACCGACAGAGCCTGGGGCAGGGCCTG')])]

In [11]:
# Flatten the results and reduce by key to count occurrences
test_rdd.flatMap(lambda x: [(y[0],1) for y in x[1]]).reduceByKey(add).collect()

[('ENST00000390360.3', 4),
 ('ENST00000634605.1', 1),
 ('ENST00000634176.1', 2),
 ('ENST00000390392.3', 11),
 ('ENST00000390424.2', 4),
 ('ENST00000390387.3', 11),
 ('ENST00000390425.2', 5),
 ('ENST00000390357.3', 9),
 ('ENST00000390427.3', 6),
 ('ENST00000634383.1', 6),
 ('ENST00000634111.1', 6),
 ('ENST00000390363.2', 2),
 ('ENST00000390426.2', 2),
 ('ENST00000390423.2', 3),
 ('ENST00000542354.1', 1),
 ('ENST00000390361.3', 2),
 ('ENST00000390469.2', 2),
 ('ENST00000455382.2', 3),
 ('ENST00000631824.1', 4),
 ('ENST00000390362.1', 1),
 ('ENST00000632187.1', 3),
 ('ENST00000535880.2', 1),
 ('ENST00000390381.3', 3),
 ('ENST00000632248.1', 3),
 ('ENST00000633265.1', 1),
 ('ENST00000390353.2', 1),
 ('ENST00000547918.2', 1),
 ('ENST00000632308.1', 1),
 ('ENST00000633313.1', 1)]