In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, regexp_replace, when, expr
from pyspark.sql.functions import size, monotonically_increasing_id, concat_ws, collect_list, regexp_extract
from pyspark.sql.functions import sum as pyspark_sum

In [2]:
spark = SparkSession.builder \
    .appName("Amino Acid Frequency Calculation") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/05 07:58:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/05 07:58:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/05 07:58:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
file_path = "./as2/Q1_data/protein.fasta"
fasta_df = spark.read.text(file_path)

# Identify the lines starting with '>'
fasta_df = fasta_df.withColumn("is_header", when(col("value").startswith(">"), 1).otherwise(0))
fasta_df = fasta_df.withColumn("line_num", monotonically_increasing_id())
fasta_df = fasta_df.withColumn("group", expr("sum(int(is_header)) over (order by line_num)"))
fasta_df.show(5)

24/04/05 07:58:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:58:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:58:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:58:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:58:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+--------------------+---------+--------+-----+
|               value|is_header|line_num|group|
+--------------------+---------+--------+-----+
|>UniRef50_A0A5A9P...|        1|       0|    1|
|MEEITQIKKRLSQTVRL...|        0|       1|    1|
|KKEVVAVAKKEEVLKKE...|        0|       2|    1|
|DIVPQMRDVSLPPKEEE...|        0|       3|    1|
|SLPPKKDEEIVCEKKEV...|        0|       4|    1|
+--------------------+---------+--------+-----+
only showing top 5 rows



In [5]:
# Filter out lines starting with '>'
filtered_df = fasta_df.filter(col("is_header") == 0)

# Merge the sequences between '>' lines
# merged_df = filtered_df.groupBy("group").agg(concat_ws("", collect_list("value")).alias("sequence"))
merged_df = filtered_df

merged_df.show(5)
merged_df.count()

24/04/05 07:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 07:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+---------+--------+-----+
|               value|is_header|line_num|group|
+--------------------+---------+--------+-----+
|MEEITQIKKRLSQTVRL...|        0|       1|    1|
|KKEVVAVAKKEEVLKKE...|        0|       2|    1|
|DIVPQMRDVSLPPKEEE...|        0|       3|    1|
|SLPPKKDEEIVCEKKEV...|        0|       4|    1|
|KEHEEKETFVVLKKEII...|        0|       5|    1|
+--------------------+---------+--------+-----+
only showing top 5 rows



597583

In [7]:
# Explode the merged sequence into individual amino acids
amino_acid_df = merged_df.select(explode(split(col("value"), "")).alias("amino_acid"))

# Calculate the frequency of each amino acid
frequency_df = amino_acid_df.groupBy("amino_acid").count().orderBy("amino_acid")
frequency_df.show(23)

+----------+-------+
|amino_acid|  count|
+----------+-------+
|         A|3223081|
|         B|      6|
|         C| 564455|
|         D|2216904|
|         E|2674664|
|         F| 985877|
|         G|2653426|
|         H| 628384|
|         I|1726915|
|         K|1684031|
|         L|2851645|
|         M| 467474|
|         N|1316889|
|         P|2097950|
|         Q|1422769|
|         R|1789613|
|         S|2747798|
|         T|2795042|
|         V|2760761|
|         W| 351166|
|         X|   1347|
|         Y| 823096|
|         Z|      2|
+----------+-------+



In [8]:
file_path = "./as2/Q1_data/protein.fasta"
fasta_rdd = spark.sparkContext.textFile(file_path)

# Identify lines starting with '>'
def is_header(line):
    return line.startswith(">")

# Assign group to each line based on the occurrence of '>'
def assign_group(iterator):
    group = 0
    for line in iterator:
        if is_header(line):
            group += 1
        yield (group, line)

# Assign group to each line
grouped_rdd = fasta_rdd.mapPartitions(assign_group)

# Filter out lines starting with '>'
filtered_rdd = grouped_rdd.filter(lambda x: not is_header(x[1]))

# Merge the sequences between '>' lines
merged_rdd = filtered_rdd.groupByKey().mapValues(lambda lines: "".join(lines))

# Explode the merged sequence into individual amino acids
amino_acid_rdd = merged_rdd.flatMap(lambda x: list(x[1]))

# Count the frequency of each amino acid
frequency_rdd = amino_acid_rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b).sortByKey()

# Print the result
print("Frequency of each amino acid:")
for amino_acid, count in frequency_rdd.collect():
    print(amino_acid, count)

                                                                                

Frequency of each amino acid:
A 3223081
B 6
C 564455
D 2216904
E 2674664
F 985877
G 2653426
H 628384
I 1726915
K 1684031
L 2851645
M 467474
N 1316889
P 2097950
Q 1422769
R 1789613
S 2747798
T 2795042
V 2760761
W 351166
X 1347
Y 823096
Z 2


In [10]:
# Calculate the total count of "STAT" motif across all proteins

stat_count_df = merged_df.withColumn("stat_occurrences", (size(split(col("value"), "STAT")) - 1))
stat_count_df.show(5)

total_stat_count = stat_count_df.agg({"stat_occurrences": "sum"}).collect()[0][0]

print("Total number of occurrences of 'STAT' motif across all proteins:", total_stat_count)

24/04/05 08:00:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 08:00:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 08:00:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 08:00:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/05 08:00:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+---------+--------+-----+----------------+
|               value|is_header|line_num|group|stat_occurrences|
+--------------------+---------+--------+-----+----------------+
|MEEITQIKKRLSQTVRL...|        0|       1|    1|               0|
|KKEVVAVAKKEEVLKKE...|        0|       2|    1|               0|
|DIVPQMRDVSLPPKEEE...|        0|       3|    1|               0|
|SLPPKKDEEIVCEKKEV...|        0|       4|    1|               0|
|KEHEEKETFVVLKKEII...|        0|       5|    1|               0|
+--------------------+---------+--------+-----+----------------+
only showing top 5 rows

Total number of occurrences of 'STAT' motif across all proteins: 2052


In [12]:
course_path = "./as2/Q2_data/courses.csv"
instructor_path = "./as2/Q2_data/instructors.csv"
course_df = spark.read.csv(course_path, header = True)
instructor_df = spark.read.csv(instructor_path, header = True)

course_df.show(5), instructor_df.show(5)

+-------+--------------------+--------------------+---------+-----------+----------------------+--------------------+----------------+----------------+--------------+--------------------+
|     id|               title|                 url|   rating|num_reviews|num_published_lectures|             created|last_update_date|        duration|instructors_id|               image|
+-------+--------------------+--------------------+---------+-----------+----------------------+--------------------+----------------+----------------+--------------+--------------------+
| 567828|The Complete Pyth...|/course/complete-...|4.5927815|     452973|                   155|2015-07-29T00:12:23Z|      2021-03-14|  22 total hours|       9685726|https://img-c.ude...|
|1565838|The Complete 2023...|/course/the-compl...| 4.667258|     263152|                   490|2018-02-22T12:02:33Z|      2023-01-20|65.5 total hours|      31334738|https://img-c.ude...|
| 625204|The Web Developer...|/course/the-web-d...|4.6961474

(None, None)

In [13]:
instructor_df_renamed = instructor_df.withColumnRenamed("id", "instructor_id").withColumnRenamed("title", "instructor_title")
joined_df = course_df.join(instructor_df_renamed, instructor_df_renamed.instructor_id == course_df.instructors_id, how = "inner")
joined_df.drop(joined_df["instructors_id"])
joined_df.show(5)

+-------+--------------------+--------------------+---------+-----------+----------------------+--------------------+----------------+----------------+--------------+--------------------+------+-------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+
|     id|               title|                 url|   rating|num_reviews|num_published_lectures|             created|last_update_date|        duration|instructors_id|               image|_class|instructor_id|    instructor_title|      name|        display_name|           job_title|         image_50x50|       image_100x100|initials|                 url|
+-------+--------------------+--------------------+---------+-----------+----------------------+--------------------+----------------+----------------+--------------+--------------------+------+-------------+--------------------+----------+--------------------+--------------------+--------

In [14]:
course_df.createOrReplaceTempView("course")
instructor_df.createOrReplaceTempView("instructor")
joined_df.createOrReplaceTempView("course_with_instructor")

In [18]:
q2_2_df = spark.sql("""
    select 
        display_name,
        job_title,
        rating
    from
        course_with_instructor
    where
        title like "%spark%"
    and 
        created >= "2018-01-01 00:00:00"
    and 
        rating = (
            select 
                max(rating) 
            from 
                course_with_instructor 
            where 
                title like "%spark%" 
            and 
                created >= "2018-01-01 00:00:00"
        )
    order by
        rating desc
""")
q2_2_df.show(truncate = False)

+------------+-------------------------------------+---------+
|display_name|job_title                            |rating   |
+------------+-------------------------------------+---------+
|Deby Coles  |Sewer, Artist, Crafter and Instructor|4.6432705|
+------------+-------------------------------------+---------+



In [19]:
q2_3_df = spark.sql("""
    select 
        id,
        title,
        created,
        round(rating, 1) AS rating
    from
        course
    where
        lower(title) like "%interview%"
    order by
        rating desc,
        created desc
""")
q2_3_df.show(truncate = False)

+-------+------------------------------------------------------------+--------------------+------+
|id     |title                                                       |created             |rating|
+-------+------------------------------------------------------------+--------------------+------+
|4886926|Interview Oriented Data Structure Arrays & Linked List C|C++|2022-09-17T17:57:14Z|5.0   |
|4309400|CATIA V5 FOR JOBS INTERVIEW                                 |2021-09-20T12:54:23Z|5.0   |
|4829150|Réaliser des interviews au rendu professionnel (PARTIE 2)   |2022-08-12T14:54:06Z|4.9   |
|4722894|"The ""BigTech"" System Design Interview Bootcamp"          |2022-06-07T14:53:40Z|4.9   |
|4499476|Power BI Interview Questions and Answers                    |2022-01-17T11:08:03Z|4.9   |
|4266596|C#/.NET - 50 Essential Interview Questions (Junior Level)   |2021-08-27T13:46:55Z|4.9   |
|4189444|SAP MM Mock Interview Videos Part 1                         |2021-07-17T04:48:21Z|4.9   |
|4149490|M