In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [7]:
ref_parent_file = "../../data/parquets/w1118.parquet"
alt_parent_file = "../../data/parquets/oregonr.parquet"
sample_file_glob = "../../data/parquets/WT-G0-*.parquet"
ref_parent_name = "w1118"
alt_parent_name = "oregonr"

ref_qual_cutoff = 200
ref_depth_cutoff = 30

ref_out_file = "../../data/parquets/reference.parquet"
sample_out_file = "../../data/parquets/progeny.parquet"

df = spark.read.parquet(ref_parent_file, alt_parent_file)

df.createOrReplaceTempView("df")

df.show()

+------+----------+--------+---------+-------+-------+--------+-----+------------+
|sample|chromosome|position|reference|variant|quality|genotype|depth|allele_depth|
+------+----------+--------+---------+-------+-------+--------+-----+------------+
| w1118|     chr2L|     223|        A|      .|28.2394|     0/0|    0|           0|
| w1118|     chr2L|     224|        T|      .|28.2394|     0/0|    0|           0|
| w1118|     chr2L|     225|        A|      .|28.2394|     0/0|    0|           0|
| w1118|     chr2L|     226|        T|      .|28.2394|     0/0|    0|           0|
| w1118|     chr2L|     226|        T|      .|31.7802|     0/0|    2|           0|
| w1118|     chr2L|     227|        A|      .| 34.543|     0/0|    2|           2|
| w1118|     chr2L|     228|        T|      .| 34.543|     0/0|    2|           2|
| w1118|     chr2L|     229|        G|      .| 34.543|     0/0|    2|           2|
| w1118|     chr2L|     230|        A|      .| 34.543|     0/0|    2|           2|
| w1

In [8]:
parents = spark.sql(
    "SELECT sample, chromosome, CAST(position AS INTEGER) AS int_pos, quality, genotype, depth, allele_depth, reference, variant, (CASE WHEN variant='.' THEN reference ELSE variant END) AS mod_variant FROM df"
)
parents.createOrReplaceTempView("parents")

parents.show()

+------+----------+-------+-------+--------+-----+------------+---------+-------+-----------+
|sample|chromosome|int_pos|quality|genotype|depth|allele_depth|reference|variant|mod_variant|
+------+----------+-------+-------+--------+-----+------------+---------+-------+-----------+
| w1118|     chr2L|    223|28.2394|     0/0|    0|           0|        A|      .|          A|
| w1118|     chr2L|    224|28.2394|     0/0|    0|           0|        T|      .|          T|
| w1118|     chr2L|    225|28.2394|     0/0|    0|           0|        A|      .|          A|
| w1118|     chr2L|    226|28.2394|     0/0|    0|           0|        T|      .|          T|
| w1118|     chr2L|    226|31.7802|     0/0|    2|           0|        T|      .|          T|
| w1118|     chr2L|    227| 34.543|     0/0|    2|           2|        A|      .|          A|
| w1118|     chr2L|    228| 34.543|     0/0|    2|           2|        T|      .|          T|
| w1118|     chr2L|    229| 34.543|     0/0|    2|          

In [10]:
check_unique_variants = spark.sql(
    """SELECT chromosome, int_pos, COUNT(variant) AS n_variants FROM parents
    GROUP BY chromosome, int_pos
    HAVING n_variants >= 2"""
)
check_unique_variants.createOrReplaceTempView("check_unique_variants")

check_unique_variants.show()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/nas/longleaf/apps/spark/2.4.0/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/nas/longleaf/apps/spark/2.4.0/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/nas/longleaf/apps/spark/2.4.0/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o25.sql

In [None]:
temp_ref_query = """SELECT
    sample,
    parents.chromosome,
    parents.int_pos,
    reference,
    mod_variant,
    quality,
    genotype,
    depth,
    allele_depth
    FROM parents
    INNER JOIN check_unique_variants ON (
        parents.chromosome = check_unique_variants.chromosome
        AND parents.int_pos = check_unique_variants.int_pos
        )
    WHERE genotype!='0/1'
        AND quality > {ref_qual_cutoff}
        AND LENGTH(mod_variant) <= 1
        AND LENGTH(reference) <= 1
        AND depth > {ref_depth_cutoff}"""

    temp_ref = spark.sql(
        temp_ref_query,
        ref_qual_cutoff=ref_qual_cutoff,
        ref_depth_cutoff=ref_depth_cutoff
        )
    temp_ref.createOrReplaceTempView("temp_ref")
    
    temp_ref.show()
    

In [2]:
    parent_query = """SELECT
    sample,
    chromosome,
    int_pos,
    mod_variant AS {allele}
    FROM temp_ref
    WHERE sample={parent_name}"""

    ref_parent = spark.sql(
        ref_query,
        allele="ref_allele",
        parent_name=ref_parent_name
    )
    ref_parent.createOrReplaceTempView("ref_parent")

    alt_parent = spark.sql(
        ref_query,
        allele="alt_allele",
        parent_name=alt_parent_name
    )
    alt_parent.createOrReplaceTempView("alt_parent")

    ref = spark.sql("""SELECT * FROM ref_parent
        FULL JOIN alt_parent ON (
            ref_parent.chromosome = alt_parent.chromosome
            AND ref_parent.int_pos = alt_parent.int_pos
            )
        ORDER BY ref_parent.chromosome, ref_parent.int_pos"""
    )
    ref.createOrReplaceTempView("ref")

    ref_out = spark.sql(
        """SELECT chromosome, int_pos AS position, ref_allele AS reference, alt_allele AS variant FROM ref""",
        ref=ref
    )
    ref_out.write.parquet(ref_out_file)

TypeError: 'bytes' object cannot be interpreted as an integer