In [0]:
%sql
SELECT * FROM workspace.raw.protein

In [0]:
%sql
SELECT id, COUNT(id)
FROM workspace.raw.protein
WHERE blast_of_id IS NULL
GROUP BY id
HAVING COUNT(id) > 1
ORDER BY COUNT(id) DESC

In [0]:
%sql
SELECT * FROM workspace.raw.protein
WHERE id IN ("JAMSEZ010000004.1.g911")

In [0]:
%sql
SELECT *,
           ROW_NUMBER() OVER (PARTITION BY id ORDER BY record_update_ts DESC) AS rn
    FROM workspace.raw.protein
    WHERE blast_of_id IS NULL


In [0]:
%sql
MERGE INTO workspace.raw.protein AS target
USING (
    SELECT * FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY id ORDER BY record_update_ts DESC) AS rn
        FROM workspace.raw.protein
        WHERE blast_of_id IS NULL
    ) WHERE rn = 2
) AS source
ON target.id = source.id
AND target.record_create_ts = source.record_create_ts
AND target.record_update_ts = source.record_update_ts
WHEN MATCHED THEN
  DELETE


In [0]:
%sql
SELECT id, COUNT(id)
FROM workspace.raw.protein
WHERE blast_of_id IS NULL
GROUP BY id
HAVING COUNT(id) > 1
ORDER BY COUNT(id) DESC

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, regexp_extract, col, lag, collect_list, concat_ws, current_timestamp, concat, lit
from pyspark.sql.window import Window

fasta_df = spark.read.text("/Volumes/workspace/raw/input/700.fasta")

# Add a unique line number
fasta_df = fasta_df.withColumn("line_num", monotonically_increasing_id())

# Extract accession rows
accession_df = fasta_df.filter(fasta_df.value.startswith(">"))
accession_df = accession_df.withColumn("accession", regexp_extract(col("value"), ">(.+)", 1))

# Window spec for ordering by line_num
window_spec = Window.orderBy("line_num")

# Get line number of next accession line for boundary
accession_line_df = accession_df.select("line_num", "accession")
accession_line_df = accession_line_df.withColumn("next_line_num", lag("line_num", -1).over(window_spec))

# Join sequences between acc lines
seq_df = fasta_df.join(
    accession_line_df,
    (fasta_df.line_num >= accession_line_df.line_num) & 
    ((fasta_df.line_num < accession_line_df.next_line_num) | accession_line_df.next_line_num.isNull())
    ).select("accession", "value")

# Group and concatenate sequence lines including header line
seq_by_accession = seq_df.groupBy("accession").agg(
    concat_ws("\n", collect_list("value")).alias("fasta_sequence")
)

# Add columns for create and update timestamp, rename accession to id
final_df = seq_by_accession.withColumn("record_create_ts", current_timestamp()) \
                           .withColumn("record_update_ts", current_timestamp()) \
                           .withColumnRenamed("accession", "id") \
                           .select("record_create_ts", "record_update_ts", "id", "fasta_sequence")

final_df.display()


In [0]:
final_df.write.mode("append").saveAsTable("workspace.raw.protein")