Download Link: https://www.uniprot.org/help/downloads

#### Import dependencies

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

import seaborn as sns
import matplotlib.pyplot as plt

#### Download UniRed50 via seqkit

In [None]:
!seqkit fx2tab ~/Downloads/uniref50.fasta.gz > uniref50.tsv

#### Initiate PySpark

In [2]:
spark = SparkSession.builder \
    .appName("LargeFileProcessing") \
    .master("local[*]") \
    .config("spark.driver.memory", "20g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.sql.shuffle.partitions", "300") \
    .config("spark.default.parallelism", "300") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/13 15:28:34 WARN Utils: Your hostname, Nakorns-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 172.31.226.60 instead (on interface en0)
26/01/13 15:28:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/13 15:28:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Filter only protein sequence column and save dataset in parquet format

In [None]:
df = spark.read.csv('uniref50.tsv', sep="\t").drop("_c0", "_c2").toDF('seq')
df.show(5)
df.write.parquet("uniref50.parquet")

+--------------------+
|                 seq|
+--------------------+
|MGRIRVWVGTSIPNPVN...|
|MSEQAPTFIKPLQSVVA...|
|MIPDALRVFIDIFGGVA...|
|MEEITQIKKRLSQTVRL...|
|MCFQQQILKAVENVTQH...|
+--------------------+
only showing top 5 rows


#### Filter only sequences length between 20 and 512 and save dataset in parquet format

In [None]:
df = spark.read.parquet('uniref50.parquet')
df = df.withColumn("seq_length", length('seq')).filter("seq_length >= 20 AND seq_length <= 512").limit(8_000_000)
df = df.drop('id').withColumn('id', monotonically_increasing_id())
df = df.select(['id'] + [c for c in df.columns if c != 'id']) # Reoder column, make id column as first column
df.write.parquet("uniref50_8M.parquet")
df.show(5)

+---+--------------------+----------+
| id|                 seq|seq_length|
+---+--------------------+----------+
|  0|MDDLQDTNDQLYQLADD...|        73|
|  1|PPSFIHKPDPQEVLPGS...|       143|
|  2|MWIEVGQVPAGVQKFQI...|       121|
|  3|VLKKKLQAFTLKVSSSS...|        66|
|  4|PPSSPVGPVKFIDATIS...|       359|
+---+--------------------+----------+
only showing top 5 rows


In [20]:
df = spark.read.parquet("uniref50_8M.parquet")
df.show()

+---+--------------------+----------+
| id|                 seq|seq_length|
+---+--------------------+----------+
|  0|MRTSLVKLHRYLGLGMA...|       379|
|  1|MKVVAVILAGGKGSRFG...|       375|
|  2|MTESTTIKRSSTAVLLG...|       380|
|  3|MNKLIVLILGLFITSSC...|       380|
|  4|MKTLRFQLYRVWVFFGR...|       380|
|  5|MNARTGVEMARAVADAV...|       380|
|  6|MGVTTLSISLGLGMLMV...|       284|
|  7|MFLLTLENNGDILLDTG...|       380|
|  8|MHPIELDRGTWTATCLA...|       378|
|  9|MVVRSCSTALGDGIRIT...|       380|
| 10|MTDSRKKNGFVAVANEI...|       380|
| 11|MKESEQLKEIRNVLDEI...|       297|
| 12|MKKTFYFTFIYLSLFST...|       380|
| 13|MSLFNTLNHSSKDEVDC...|       368|
| 14|MTGLELPVVDTDTYRQY...|       334|
| 15|MHSTRRYSPYIRPDHPQ...|       366|
| 16|AVKGAVLDIASGDANLT...|       380|
| 17|MSSFKIATSRSNVGILN...|       314|
| 18|MDLIKRAWVEISIENLK...|       380|
| 19|MIPSALNTPFAITGDGL...|       380|
+---+--------------------+----------+
only showing top 20 rows
