In [13]:
import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType
import math
from pyspark.sql.functions import udf

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

from pyspark.sql import SparkSession

In [14]:
# Initialize a Spark session
spark = SparkSession.builder.appName("ReadGZFile").getOrCreate()

# Replace 'your_file.txt.gz' with the actual path to your .txt.gz file
file_path = '/Users/hn9/Desktop/locus_dentist_test.txt.gz'

filtered_StudyLocus = spark.read.option("header", "true").option("delimiter", "\t").csv(file_path)


In [15]:
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

def calculate_dentist(filtered_StudyLocus, lead_snp_ID, n_sample=500000, r2_threshold=0.6, nlog10p_dentist_s_threshold=1):
    # Calculate 'r' using aggregation
    agg_result = filtered_StudyLocus.agg((F.sum("R2") * n_sample).alias("R2_sum"), (F.count("R2") * n_sample).alias("R2_count"))
    agg_result = agg_result.withColumn("r", agg_result["R2_sum"] / agg_result["R2_count"])
    filtered_StudyLocus = filtered_StudyLocus.crossJoin(agg_result.select("r"))

    # Find the lead SNP
    lead_idx_snp = filtered_StudyLocus.filter(filtered_StudyLocus.ID == lead_snp_ID).first()

    # Calculate 't_dentist_s' and 'dentist_outlier'
    lead_z = lead_idx_snp.Z
    filtered_StudyLocus = filtered_StudyLocus.withColumn(
        "t_dentist_s",
        ((filtered_StudyLocus.Z - filtered_StudyLocus.r * lead_z) ** 2) / (1 - filtered_StudyLocus.r ** 2)
    )
    filtered_StudyLocus = filtered_StudyLocus.withColumn("t_dentist_s", F.when(filtered_StudyLocus["t_dentist_s"] < 0, float("inf")).otherwise(filtered_StudyLocus["t_dentist_s"]))
    def calc_nlog10p_dentist_s(t_dentist_s):
        return math.log(1 - math.exp(-t_dentist_s)) / -math.log(10)

    udf_calc_nlog10p_dentist_s = F.udf(calc_nlog10p_dentist_s, DoubleType())
    filtered_StudyLocus = filtered_StudyLocus.withColumn("nlog10p_dentist_s", udf_calc_nlog10p_dentist_s(filtered_StudyLocus["t_dentist_s"]))

    #n_dentist_s_outlier = filtered_StudyLocus.filter((filtered_StudyLocus.R2 > r2_threshold) & (filtered_StudyLocus.nlog10p_dentist_s > nlog10p_dentist_s_threshold)).count()

    filtered_StudyLocus = filtered_StudyLocus.withColumn(
        "dentist_outlier",
        F.when((filtered_StudyLocus.R2 > r2_threshold) & (filtered_StudyLocus.nlog10p_dentist_s > nlog10p_dentist_s_threshold), 1).otherwise(0)
    )
    return filtered_StudyLocus



In [17]:
test = calculate_dentist(filtered_StudyLocus, lead_snp_ID="1_155162560_G_A")

In [18]:
test.show(5)

+---------------+-----+---------+---+---+-------------------+------+-----------+--------------------+-------------------+-------------------+---------------+
|             ID|chrom|      pos|ref|alt|                  Z|  pval|         R2|                   r|        t_dentist_s|  nlog10p_dentist_s|dentist_outlier|
+---------------+-----+---------+---+---+-------------------+------+-----------+--------------------+-------------------+-------------------+---------------+
|1_154662946_C_T|    1|154662946|  C|  T|-0.0635976583288984|0.9494| 1.3828e-05|0.008661797923748287|0.02633732386848109| 1.5851348809150372|              0|
|1_154662972_C_T|    1|154662972|  C|  T|   1.47142857142857|0.1413|0.000146955|0.008661797923748287| 1.8845689564218693|0.07155012982540826|              0|
|1_154663409_G_T|    1|154663409|  G|  T|  0.686363636363636|0.4926|1.10038e-06|0.008661797923748287|0.34539322804009587| 0.5345307684064269|              0|
|1_154663800_G_A|    1|154663800|  G|  A|  0.7272727

In [3]:
import math
# Define constants
n_sample = 500000
r2_threshold = 0.6
nlog10p_dentist_s_threshold = 1
lead_snp_ID = "1_155162560_G_A"

# Calculate 'r' using aggregation
agg_result = filtered_StudyLocus.agg((F.sum("R2") * n_sample).alias("R2_sum"), (F.count("R2") * n_sample).alias("R2_count"))
agg_result = agg_result.withColumn("r", agg_result["R2_sum"] / agg_result["R2_count"])

# Join the 'r' result back to the original DataFrame on a common column, for example, a column 'key' if you have one
filtered_StudyLocus = filtered_StudyLocus.crossJoin(agg_result.select("r"))

# Find the lead SNP
lead_idx_snp = filtered_StudyLocus.filter(filtered_StudyLocus.ID == lead_snp_ID).first()

# Calculate 't_dentist_s' and 'dentist_outlier'
lead_z = lead_idx_snp.Z
filtered_StudyLocus = filtered_StudyLocus.withColumn(
    "t_dentist_s",
    ((df.Z - df.r * lead_z) ** 2) / (1 - df.r ** 2)
)
df = df.withColumn("t_dentist_s", F.when(df["t_dentist_s"] < 0, float("inf")).otherwise(df["t_dentist_s"]))

# Calculate 'nlog10p_dentist_s' using a plain Python function
import math

def calc_nlog10p_dentist_s(t_dentist_s):
    return math.log(1 - math.exp(-t_dentist_s)) / -math.log(10)

udf_calc_nlog10p_dentist_s = F.udf(calc_nlog10p_dentist_s, DoubleType())
df = df.withColumn("nlog10p_dentist_s", udf_calc_nlog10p_dentist_s(df["t_dentist_s"]))

n_dentist_s_outlier = df.filter((df.R2 > r2_threshold) & (df.nlog10p_dentist_s > nlog10p_dentist_s_threshold)).count()

df = df.withColumn(
    "dentist_outlier",
    F.when((df.R2 > r2_threshold) & (df.nlog10p_dentist_s > nlog10p_dentist_s_threshold), 1).otherwise(0)
)


23/10/18 09:37:01 WARN ExtractPythonUDFFromJoinCondition: The join condition:(calc_nlog10p_dentist_s(CASE WHEN ((POWER((cast(Z#22 as double) - (r#48 * 11.3930197268589)), 2.0) / (1.0 - POWER(r#48, 2.0))) < 0.0) THEN Infinity ELSE (POWER((cast(Z#22 as double) - (r#48 * 11.3930197268589)), 2.0) / (1.0 - POWER(r#48, 2.0))) END)#114 > 1.0) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


                                                                                

Number of DENTIST outliers detected: 0
+---------------+-----+---------+---+---+-------------------+-------+-----------+--------------------+--------------------+--------------------+
|             ID|chrom|      pos|ref|alt|                  Z|   pval|         R2|                   r|         t_dentist_s|   nlog10p_dentist_s|
+---------------+-----+---------+---+---+-------------------+-------+-----------+--------------------+--------------------+--------------------+
|1_154662946_C_T|    1|154662946|  C|  T|-0.0635976583288984| 0.9494| 1.3828e-05|0.008661797923748287| 0.02633732386848109|  1.5851348809150372|
|1_154662972_C_T|    1|154662972|  C|  T|   1.47142857142857| 0.1413|0.000146955|0.008661797923748287|  1.8845689564218693| 0.07155012982540826|
|1_154663409_G_T|    1|154663409|  G|  T|  0.686363636363636| 0.4926|1.10038e-06|0.008661797923748287| 0.34539322804009587|  0.5345307684064269|
|1_154663800_G_A|    1|154663800|  G|  A|  0.727272727272727| 0.4676| 9.8361e-07|0.00866179

In [4]:

# Count the number of DENTIST outliers and create a new column
n_dentist_s_outlier = df.filter((df.R2 > r2_threshold) & (df.nlog10p_dentist_s > nlog10p_dentist_s_threshold)).count()
print("Number of DENTIST outliers detected:", n_dentist_s_outlier)

df = df.withColumn(
    "dentist_outlier",
    F.when((df.R2 > r2_threshold) & (df.nlog10p_dentist_s > nlog10p_dentist_s_threshold), 1).otherwise(0)
)


23/10/18 09:37:22 WARN ExtractPythonUDFFromJoinCondition: The join condition:(calc_nlog10p_dentist_s(CASE WHEN ((POWER((cast(Z#22 as double) - (r#48 * 11.3930197268589)), 2.0) / (1.0 - POWER(r#48, 2.0))) < 0.0) THEN Infinity ELSE (POWER((cast(Z#22 as double) - (r#48 * 11.3930197268589)), 2.0) / (1.0 - POWER(r#48, 2.0))) END)#114 > 1.0) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
Number of DENTIST outliers detected: 0


In [30]:
df.show(5)

+---------------+-----+---------+---+---+-------------------+------+-----------+--------------------+-------------------+
|             ID|chrom|      pos|ref|alt|                  Z|  pval|         R2|                   r|        t_dentist_s|
+---------------+-----+---------+---+---+-------------------+------+-----------+--------------------+-------------------+
|1_154662946_C_T|    1|154662946|  C|  T|-0.0635976583288984|0.9494| 1.3828e-05|0.008661797923748287|0.02633732386848109|
|1_154662972_C_T|    1|154662972|  C|  T|   1.47142857142857|0.1413|0.000146955|0.008661797923748287| 1.8845689564218693|
|1_154663409_G_T|    1|154663409|  G|  T|  0.686363636363636|0.4926|1.10038e-06|0.008661797923748287|0.34539322804009587|
|1_154663800_G_A|    1|154663800|  G|  A|  0.727272727272727|0.4676| 9.8361e-07|0.008661797923748287| 0.3951533916088011|
|1_154664496_C_T|    1|154664496|  C|  T|   1.45714285714286|0.1457|0.000144346|0.008661797923748287| 1.8455488380217497|
+---------------+-----+-