In [1]:
import pandas as pd
from pyspark.sql import SparkSession


### For spark section

In [17]:
# Initialize SparkSession
spark = SparkSession.builder.appName("txt to DataFrame").getOrCreate()

# Load CSV file into DataFrame
df = spark.read.csv("/homes/zhe/Desktop/programming/p5/exam/dbNSFP4.9a.txt.gz.SMALL",sep='\t', header=True, inferSchema=True)

# Show the DataFrame content
df.show()

+----+------------+---+---+-----+-----+------------+--------+-----------------+--------+-----------------+-----+--------+---------------+--------------------+-----------------+-----------+----------------+-------------+-------------+------------+------------+---------+---------+------+-------------+---+-------------+----------+--------+--------+----------------+----------------+---------------+--------+------------------+---------------------+----------+------------------------+---------+------------+--------------------------+-----------+--------------------+------------------------+-------------------+--------------------+------------------------+-------------------+---------+-----------------------+--------+---------+--------------------+----------------------------------+-------------------+--------------------+------------------+----------------------+--------------------------+---------------------+------------+--------------------------+-----------+-------------+----------------

### What are average (mean), minimum and maximum score for the column SIFT_score?

In [18]:
from pyspark.sql.functions import mean, min, max, when, split, col

# Replace '.' with null
df = df.withColumn("SIFT4G_score", when(col("SIFT4G_score") == ".", None).otherwise(col("SIFT4G_score")))

# Split multiple scores and take the first one
df = df.withColumn("SIFT4G_score", split(col("SIFT4G_score"), ";").getItem(0))

# Convert to double
df = df.withColumn("SIFT4G_score", col("SIFT4G_score").cast("double"))

# Recalculate statistics
sift_mean = df.select(mean("SIFT4G_score")).collect()[0][0]
sift_min = df.select(min("SIFT4G_score")).collect()[0][0]
sift_max = df.select(max("SIFT4G_score")).collect()[0][0]

print(f"Mean SIFT4G score: {sift_mean}")
print(f"Minimum SIFT4G score: {sift_min}")
print(f"Maximum SIFT4G score: {sift_max}")



Mean SIFT4G score: 0.15993993146542965
Minimum SIFT4G score: 0.0
Maximum SIFT4G score: 1.0


### Merge together the values from hg19_chr and hg19_pos(1-based) columns into a new column of strings called hg19_chr_pos with the format chr_pos and add it to your dataframe. Remove the original hg19_chr and hg19_pos(1-based) columns. [10 pts]

In [40]:
from pyspark.sql.functions import concat, col, lit

# Step 1 & 2: Merge values and create new column
df1 = df.withColumn("hg19_chr_pos", 
                   concat(col("hg19_pos(1-based)").cast("string"), lit("_"),col("hg19_chr") ))

# Step 3: Remove original columns
columns_to_drop = ["hg19_chr", "hg19_pos(1-based)"]
df1 = df1.drop(*columns_to_drop)

# Verify the changes
df1.select("hg19_chr_pos").show(5)
print(df1.columns)


+------------+
|hg19_chr_pos|
+------------+
|     65565_1|
|     65565_1|
|     65565_1|
|     65566_1|
|     65566_1|
+------------+
only showing top 5 rows

['#chr', 'pos(1-based)', 'ref', 'alt', 'aaref', 'aaalt', 'rs_dbSNP', 'hg18_chr', 'hg18_pos(1-based)', 'aapos', 'genename', 'Ensembl_geneid', 'Ensembl_transcriptid', 'Ensembl_proteinid', 'Uniprot_acc', 'Uniprot_entry', 'HGVSc_ANNOVAR', 'HGVSp_ANNOVAR', 'HGVSc_snpEff', 'HGVSp_snpEff', 'HGVSc_VEP', 'HGVSp_VEP', 'APPRIS', 'GENCODE_basic', 'TSL', 'VEP_canonical', 'cds_strand', 'refcodon', 'codonpos', 'codon_degeneracy', 'Ancestral_allele', 'AltaiNeandertal', 'Denisova', 'VindijiaNeandertal', 'ChagyrskayaNeandertal', 'SIFT_score', 'SIFT_converted_rankscore', 'SIFT_pred', 'SIFT4G_score', 'SIFT4G_converted_rankscore', 'SIFT4G_pred', 'Polyphen2_HDIV_score', 'Polyphen2_HDIV_rankscore', 'Polyphen2_HDIV_pred', 'Polyphen2_HVAR_score', 'Polyphen2_HVAR_rankscore', 'Polyphen2_HVAR_pred', 'LRT_score', 'LRT_converted_rankscore', 'LRT_pred', 'LRT_

#### For which value of "codonpos" are the most effects predicted across all SNPs in the file? Does this make biological sense, argue why or why not. [15 pts]

In [41]:
pred_columns = [col for col in df.columns if col.endswith('_pred')]

print(pred_columns)
print(f"Number of prediction columns: {len(pred_columns)}")

df_pred = df.select(['codonpos']  + pred_columns)
df_pred.show(40)

['SIFT_pred', 'SIFT4G_pred', 'Polyphen2_HDIV_pred', 'Polyphen2_HVAR_pred', 'LRT_pred', 'MutationTaster_pred', 'MutationAssessor_pred', 'FATHMM_pred', 'PROVEAN_pred', 'MetaSVM_pred', 'MetaLR_pred', 'MetaRNN_pred', 'M-CAP_pred', 'PrimateAI_pred', 'DEOGEN2_pred', 'BayesDel_addAF_pred', 'BayesDel_noAF_pred', 'ClinPred_pred', 'LIST-S2_pred', 'ESM1b_pred', 'EVE_Class10_pred', 'EVE_Class20_pred', 'EVE_Class25_pred', 'EVE_Class30_pred', 'EVE_Class40_pred', 'EVE_Class50_pred', 'EVE_Class60_pred', 'EVE_Class70_pred', 'EVE_Class75_pred', 'EVE_Class80_pred', 'EVE_Class90_pred', 'AlphaMissense_pred', 'Aloft_pred', 'fathmm-MKL_coding_pred', 'fathmm-XF_coding_pred']
Number of prediction columns: 35
+--------+---------+-----------+-------------------+-------------------+--------+-------------------+---------------------+-----------+------------+------------+-----------+------------+----------+--------------+------------+-------------------+------------------+-------------+------------+----------+-----

To answer this question accurately, we need to understand what "codonpos" means in the context of genetic analysis. "Codonpos" typically refers to the codon position, which indicates the position of a nucleotide within a codon. In the standard genetic code, codons are triplets of nucleotides that encode amino acids or stop signals. The positions within a codon are numbered 1, 2, and 3. Here's what each codon position typically means:

    Codon position 1: The first nucleotide in the codon
    Codon position 2: The second nucleotide in the codon
    Codon position 3: The third nucleotide in the codon

Now, to address the question about which codon position has the most predicted effects across all SNPs: Without access to the specific file and data mentioned in the question, I can't provide the exact value. However, I can explain why certain codon positions might have more predicted effects and whether it makes biological sense. Typically, mutations in codon position 3 are expected to have the least effect, while mutations in positions 1 and 2 are more likely to have significant effects. This is due to the degeneracy of the genetic code:

    Many codons that differ only in the third position encode the same amino acid (synonymous mutations). For example, UCU, UCC, UCA, and UCG all encode serine.
    Changes in the first or second position are more likely to result in a different amino acid (non-synonymous mutations) or even a stop codon.

If the data shows that codon position 3 has the most predicted effects, it might not make immediate biological sense, as we generally expect changes in this position to be less impactful. However, there could be reasons for this:

    Codon usage bias: Some organisms prefer certain codons over others for the same amino acid, which can affect translation efficiency.
    Splicing effects: Changes in the third position might affect splicing if they occur near intron-exon boundaries.
    RNA structure: The third position could be important for mRNA secondary structure.
    Sample bias: The dataset might have an overrepresentation of certain types of mutations.

On the other hand, if positions 1 or 2 show the most predicted effects, this would align with our biological understanding, as changes in these positions are more likely to alter the amino acid sequence of the resulting protein. To argue whether the results make biological sense, you would need to:

    Identify which codon position has the most predicted effects in your data.
    Consider the types of effects predicted (e.g., synonymous vs. non-synonymous changes).
    Evaluate whether the distribution of effects across codon positions aligns with our understanding of the genetic code and protein synthesis.
    Consider any unique features of the organism or genomic region being studied that might influence the results.

Without the specific data, it's not possible to make a definitive argument. However, this explanation should provide a framework for analyzing and interpreting the results once you have them.

In [37]:
from pyspark.sql import functions as F

# Step 1: Create the 'avg_rankscore' column
df_with_avg = score_df.withColumn(
    'avg_rankscore', 
    F.expr(f"""
        aggregate(
            array({', '.join([f"if(`{col}` != '.' AND `{col}` IS NOT NULL, cast(`{col}` as double), NULL)" for col in rankscore_columns])}),
            0D, 
            (acc, x) -> acc + coalesce(x, 0D), 
            acc -> acc / size(array({', '.join([f"`{col}`" for col in rankscore_columns])}))
        )
    """)
)

# Step 2: Create a new DataFrame with 'codonpos' and 'avg_rankscore'
df2 = df_with_avg.select('codonpos', 'avg_rankscore')

# Step 3: Sort the rows by 'avg_rankscore' in descending order and show the top 10 rows
df2.sort(df2["avg_rankscore"].desc()).show(10)


+--------------------+------------------+
|            codonpos|     avg_rankscore|
+--------------------+------------------+
|2;2;2;2;2;2;2;2;2...|0.6062366666666668|
|2;2;2;2;2;2;2;2;2...|0.5996326315789475|
|2;2;2;2;2;2;2;2;2...|0.5963924561403509|
|1;1;1;1;1;1;1;1;1...|0.5938461403508772|
|2;2;2;2;2;2;2;2;2...|0.5895950877192984|
|2;2;2;2;2;2;2;2;2...|0.5827424561403509|
|1;1;1;1;1;1;1;1;1...|0.5810947368421054|
|2;2;2;2;2;2;2;2;2...|0.5785180701754385|
|2;2;2;2;2;2;2;2;2...|0.5774235087719299|
|2;2;2;2;2;2;2;2;2...|0.5765431578947369|
+--------------------+------------------+
only showing top 10 rows



In [36]:
# Select the relevant columns
rankscore_columns = [col for col in df.columns if col.endswith('_rankscore')]

# Create the score_df
score_df = df.select(['codonpos']  + rankscore_columns)

# Create a new column 'avg_rankscore' to store the average of non-missing values
df_with_avg = score_df.withColumn(
    'avg_rankscore', 
    F.expr(f"""
        aggregate(
            array({', '.join([f"if(`{col}` != '.' AND `{col}` IS NOT NULL, cast(`{col}` as double), NULL)" for col in rankscore_columns])}),
            0D, 
            (acc, x) -> acc + coalesce(x, 0D), 
            acc -> acc / size(array({', '.join([f"`{col}`" for col in rankscore_columns])}))
        )
    """)
)

# Show the result
df2 = df_with_avg.select('codonpos', 'avg_rankscore')
df2.show(5)
# Get the row with the maximum avg_rankscore
df2.sort(df2["avg_rankscore"].desc()).show(10)

# Show the result
max_avg_row.show()

+--------+-------------------+
|codonpos|      avg_rankscore|
+--------+-------------------+
|       1|0.04117350877192984|
|       1|0.04025929824561404|
|       1|0.04101175438596492|
|       2|0.04809070175438597|
|       2|0.04925754385964913|
+--------+-------------------+
only showing top 5 rows

+--------------------+------------------+
|            codonpos|     avg_rankscore|
+--------------------+------------------+
|2;2;2;2;2;2;2;2;2...|0.6062366666666668|
|2;2;2;2;2;2;2;2;2...|0.5996326315789475|
|2;2;2;2;2;2;2;2;2...|0.5963924561403509|
|1;1;1;1;1;1;1;1;1...|0.5938461403508772|
|2;2;2;2;2;2;2;2;2...|0.5895950877192984|
|2;2;2;2;2;2;2;2;2...|0.5827424561403509|
|1;1;1;1;1;1;1;1;1...|0.5810947368421054|
|2;2;2;2;2;2;2;2;2...|0.5785180701754385|
|2;2;2;2;2;2;2;2;2...|0.5774235087719299|
|2;2;2;2;2;2;2;2;2...|0.5765431578947369|
+--------------------+------------------+
only showing top 10 rows

+--------------------+------------------------+--------------------------+--------

In [31]:
from pyspark.sql.functions import split, explode

# Split the 'codonpos' by semicolon and expand into multiple rows
df2 = df2.withColumn("codonpos", explode(split(df2['codonpos'], ';')))

# Now, show the first 20 rows
df2.show(20)


+--------+--------------------+
|codonpos|       avg_rankscore|
+--------+--------------------+
|       1| 0.04117350877192984|
|       1| 0.04025929824561404|
|       1| 0.04101175438596492|
|       2| 0.04809070175438597|
|       2| 0.04925754385964913|
|       2|0.049146315789473684|
|       3| 0.04384894736842105|
|       3| 0.04403140350877193|
|       3| 0.04333719298245615|
|       1| 0.03364456140350878|
|       1| 0.03210456140350878|
|       1| 0.02237421052631579|
|       2|0.041771578947368423|
|       2| 0.03874877192982456|
|       2| 0.04367245614035088|
|       3|0.024340175438596495|
|       3|0.024109473684210524|
|       1| 0.02734456140350877|
|       1|0.021696315789473685|
|       1| 0.01820228070175439|
+--------+--------------------+
only showing top 20 rows



In [9]:
from pyspark.sql import functions as F

# Define filter conditions for different columns based on expected missing values
annotated_snps = df.filter(
    # Basic information columns
    (F.col("#chr").isNotNull()) & (F.col("#chr") != ".") &
    (F.col("pos(1-based)").isNotNull()) & (F.col("pos(1-based)") != ".") &
    (F.col("ref").isNotNull()) & (F.col("ref") != ".") &
    (F.col("alt").isNotNull()) & (F.col("alt") != ".") &
    
    # Amino acid and dbSNP annotations
    (F.col("aaref").isNotNull()) & (F.col("aaref") != ".") &
    (F.col("aaalt").isNotNull()) & (F.col("aaalt") != ".") &
    (F.col("rs_dbSNP").isNotNull()) & (F.col("rs_dbSNP") != ".") & (F.col("rs_dbSNP") != "nan") &
    
    # Gene and protein annotations
    (F.col("genename").isNotNull()) & (F.col("genename") != ".") &
    (F.col("Ensembl_geneid").isNotNull()) & (F.col("Ensembl_geneid") != ".;.;") &
    (F.col("Ensembl_transcriptid").isNotNull()) & (F.col("Ensembl_transcriptid") != ".") &
    (F.col("Ensembl_proteinid").isNotNull()) & (F.col("Ensembl_proteinid") != ".") &
    
    # Protein and functional annotation identifiers
    (F.col("Uniprot_acc").isNotNull()) & (F.col("Uniprot_acc") != ".;.;") &
    (F.col("Uniprot_entry").isNotNull()) & (F.col("Uniprot_entry") != ".;.;")
)

# Group by chromosome to count SNPs per chromosome
chromosome_counts = annotated_snps.groupBy("#chr").count()

# Identify the chromosome with the most annotated SNPs
most_annotated_chr = chromosome_counts.orderBy(F.desc("count")).first()
print(f"The chromosome with the most annotated SNPs is: {most_annotated_chr['#chr']} with {most_annotated_chr['count']} SNPs.")


[Stage 12:>                                                         (0 + 4) / 4]

The chromosome with the most annotated SNPs is: 1 with 994 SNPs.


                                                                                

In [39]:
most_annotated_chr

Row(#chr=1, count=994)

### For how many SNPs documented is the position different between the "hg18" and "hg19" human genome standards? [10 pts]
.	回答第二个问题：“记录的 SNP 中有多少个在 ‘hg18’ 和 ‘hg19’ 人类基因组标准中的位置不同？”
	•	需要列：hg18_pos(1-based) 和 hg19_pos(1-based)
	•	您可以使用 filter 来找到 hg18_pos(1-based) 与 hg19_pos(1-based) 不相等的记录。

In [40]:
# Create a new DataFrame with the differences
df_diff = annotated_snps.withColumn("df_18", annotated_snps["`hg18_pos(1-based)`"] - annotated_snps["`pos(1-based)`"])\
    .withColumn("df_19", annotated_snps["`hg19_pos(1-based)`"] - annotated_snps["`pos(1-based)`"])

# Show the DataFrame with the differences
df_diff.select("df_18",'df_19').show()


+--------+-----+
|   df_18|df_19|
+--------+-----+
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
|-10137.0|  0.0|
+--------+-----+
only showing top 20 rows



In [41]:
from pyspark.sql import functions as F

# Calculate the difference between df_18 and df_19
df_diff = df_diff.withColumn("difference", F.col("df_18") - F.col("df_19"))

df_diff.show()

count_diff = df_diff.filter((F.col("difference")!=("-10137"))).count()
count_diff

+----+------------+---+---+-----+-----+------------+--------+-----------------+--------+-----------------+-----+-----------+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-------------+------------+------------+----------------+--------------------+------------+-------------+----+-------------+----------+--------+--------+----------------+----------------+---------------+--------+------------------+---------------------+----------+------------------------+---------+------------+--------------------------+-----------+--------------------+------------------------+-------------------+--------------------+------------------------+-------------------+---------+-----------------------+--------+---------+--------------------+----------------------------------+-------------------+--------------------+-------------------+----------------------+--------------------------+---------------------+------------+----------------------

0

In [42]:
from pyspark.sql import functions as F

# Add the new column "positions_differ"
df = df.withColumn(
    "positions_differ",
    F.when(df["`hg18_pos(1-based)`"] != df["`hg19_pos(1-based)`"], 1).otherwise(0)
)

# Count SNPs where positions differ
count_differ = df.filter(df["positions_differ"] == 1).count()
print(f"Number of SNPs with different positions between hg18 and hg19: {count_differ}")

Number of SNPs with different positions between hg18 and hg19: 7848


### How many of the models represented in the dataset give no predictions at all? [10 pts]
	.	回答第三个问题：“数据集中有多少模型没有给出任何预测结果？”
	•	可能涉及的列：包含预测得分的列（可以是与不同模型相关的列，比如功能性预测工具得分列）
	•	可以使用 filter 和 isNull()/isNotNull() 来检查这些列是否存在任何非空预测值，统计没有任何预测结果的模型。

It compiles prediction scores from 46 prediction algorithms (SIFT, SIFT4G, Polyphen2-HDIV, Polyphen2-HVAR, LRT, MutationTaster2, MutationAssessor, FATHMM, MetaSVM, MetaLR, MetaRNN, CADD, CADD_hg19, VEST4, PROVEAN, FATHMM-MKL coding, FATHMM-XF coding, fitCons x 4, LINSIGHT, DANN, GenoCanyon, Eigen, Eigen-PC, M-CAP, REVEL, MutPred, MVP, gMVP, MPC, PrimateAI, GEOGEN2, BayesDel_addAF, BayesDel_noAF, ClinPred, LIST-S2, VARITY, ESM1b, EVE, AlphaMissense, MutFormer, PHACTboost, MutScore, ALoFT), 9 conservation scores (PhyloP x 3, phastCons x 3, GERP++, SiPhy and bStatistic) and other related information including allele frequencies observed in the 1000 Genomes Project phase 3 data, UK10K cohorts data, ExAC consortium data, gnomAD data and the NHLBI Exome Sequencing Project ESP6500 data, various gene IDs from different databases, functional descriptions of genes, gene expression and gene interaction information.

In [43]:
pred_columns = [col for col in df.columns if col.endswith('_pred')]

print(pred_columns)
print(f"Number of prediction columns: {len(pred_columns)}")

df_pred = df.select(pred_columns)
df_pred.show()

['SIFT_pred', 'SIFT4G_pred', 'Polyphen2_HDIV_pred', 'Polyphen2_HVAR_pred', 'LRT_pred', 'MutationTaster_pred', 'MutationAssessor_pred', 'FATHMM_pred', 'PROVEAN_pred', 'MetaSVM_pred', 'MetaLR_pred', 'MetaRNN_pred', 'M-CAP_pred', 'PrimateAI_pred', 'DEOGEN2_pred', 'BayesDel_addAF_pred', 'BayesDel_noAF_pred', 'ClinPred_pred', 'LIST-S2_pred', 'ESM1b_pred', 'EVE_Class10_pred', 'EVE_Class20_pred', 'EVE_Class25_pred', 'EVE_Class30_pred', 'EVE_Class40_pred', 'EVE_Class50_pred', 'EVE_Class60_pred', 'EVE_Class70_pred', 'EVE_Class75_pred', 'EVE_Class80_pred', 'EVE_Class90_pred', 'AlphaMissense_pred', 'Aloft_pred', 'fathmm-MKL_coding_pred', 'fathmm-XF_coding_pred']
Number of prediction columns: 35
+---------+-----------+-------------------+-------------------+--------+-------------------+---------------------+-----------+------------+------------+-----------+------------+----------+--------------+------------+-------------------+------------------+-------------+------------+----------+--------------

### So at the pred_columns, the missing values look like '.'.

In [44]:
# Generate a dictionary with counts of NaN values for each column
nan_counts = {col: df_pred.filter((F.col(col))==(".")).count() for col in pred_columns}

# Display NaN counts
for col, count in nan_counts.items():
    if count == 0:
        print(f"{col} have not given any predicted values\n" )
        
# Calculate total number of columns with no predictions
total = len([col for col, count in nan_counts.items() if count == 0])
print(f"There are {total} predictor that does not give any predictions")


Aloft_pred have not given any predicted values

There are 1 predictor that does not give any predictions


###  A Wrong way based the misunderstanding of predictors

In [45]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize SparkSession
spark = SparkSession.builder.appName("models").getOrCreate()

# Load the tab-separated .txt file into DataFrame
model = spark.read.option("delimiter", "\t").csv("/homes/zhe/Desktop/programming/p5/exam/db_type.txt")

predictor = model.filter((F.col("_c2"))==("true"))
# Show the DataFrame content
predictor.show()
# Count the number of rows in the filtered DataFrame
row_count = predictor.count()
print(row_count)


+--------------------+---------+----+
|                 _c0|      _c1| _c2|
+--------------------+---------+----+
|               aapos|  Integer|true|
|            genename|   String|true|
|      Ensembl_geneid|   String|true|
|Ensembl_transcriptid|   String|true|
|   Ensembl_proteinid|   String|true|
|         Uniprot_acc|   String|true|
|       Uniprot_entry|   String|true|
|       HGVSc_ANNOVAR|   String|true|
|       HGVSp_ANNOVAR|   String|true|
|        HGVSc_snpEff|   String|true|
|        HGVSp_snpEff|   String|true|
|           HGVSc_VEP|   String|true|
|           HGVSp_VEP|   String|true|
|              APPRIS|   String|true|
|       GENCODE_basic|Character|true|
|                 TSL|   String|true|
|       VEP_canonical|     Flag|true|
|          cds_strand|Character|true|
|            refcodon|   String|true|
|            codonpos|  Integer|true|
+--------------------+---------+----+
only showing top 20 rows

48


24/11/11 17:09:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [46]:
# Step 1: Extract the column from `df1` and convert it to a list
column_list = predictor.select("_c0").rdd.flatMap(lambda x: x).collect()
print(column_list)

# Generate a dictionary with counts of NaN values for each column
nan_counts = {col: df.filter((F.col(col))==(".")).count() for col in column_list}

# Display NaN counts
for col, count in nan_counts.items():
    if count == 0:
        print(f"{col} have not given any predicted values\n" )
        
# Calculate total number of columns with no predictions
total = len([col for col, count in nan_counts.items() if count == 0])
print(f"There are {total} predictor that does not give any predictions")


['aapos', 'genename', 'Ensembl_geneid', 'Ensembl_transcriptid', 'Ensembl_proteinid', 'Uniprot_acc', 'Uniprot_entry', 'HGVSc_ANNOVAR', 'HGVSp_ANNOVAR', 'HGVSc_snpEff', 'HGVSp_snpEff', 'HGVSc_VEP', 'HGVSp_VEP', 'APPRIS', 'GENCODE_basic', 'TSL', 'VEP_canonical', 'cds_strand', 'refcodon', 'codonpos', 'codon_degeneracy', 'SIFT_score', 'SIFT_pred', 'SIFT4G_score', 'SIFT4G_pred', 'MutationAssessor_score', 'MutationAssessor_pred', 'FATHMM_score', 'FATHMM_pred', 'PROVEAN_score', 'PROVEAN_pred', 'MetaRNN_score', 'MetaRNN_pred', 'MutPred_Top5features', 'MVP_score', 'MPC_score', 'DEOGEN2_score', 'DEOGEN2_pred', 'LIST-S2_score', 'LIST-S2_pred', 'Aloft_Fraction_transcripts_affected', 'Aloft_prob_Tolerant', 'Aloft_prob_Recessive', 'Aloft_prob_Dominant', 'Aloft_pred', 'Aloft_Confidence', 'clinvar_review', 'Interpro_domain']
aapos have not given any predicted values

genename have not given any predicted values

Ensembl_geneid have not given any predicted values

Ensembl_transcriptid have not given any

### What Amino acid (in the "aaref" column) has the highest average score of the predictors associated with it? [10pts]

In [47]:
from pyspark.sql import functions as F

# Select the relevant columns: 'aaref' and the rankscore columns
rankscore_columns = [col for col in df.columns if col.endswith('_rankscore')]

# Create the score_df
score_df = df.select(['aaref'] + rankscore_columns)

# Create a new column 'avg_rankscore' to store the average of non-missing values
df_with_avg = score_df.withColumn(
    'avg_rankscore', 
    F.expr(f"""
        aggregate(
            array({', '.join([f"if(`{col}` != '.' AND `{col}` IS NOT NULL, cast(`{col}` as double), NULL)" for col in rankscore_columns])}),
            0D, 
            (acc, x) -> acc + coalesce(x, 0D), 
            acc -> acc / size(array({', '.join([f"`{col}`" for col in rankscore_columns])}))
        )
    """)
)

# Show the result
#df_with_avg.select('aaref', 'avg_rankscore').show()

# Get the row with the maximum avg_rankscore
max_avg_row = df_with_avg.orderBy(F.desc("avg_rankscore")).limit(10)

# Show the result
max_avg_row.show()

+-----+------------------------+--------------------------+------------------------+------------------------+-----------------------+----------------------------------+--------------------------+--------------------------+---------------------------+---------------+-----------------+----------------+-----------------+---------------+---------------+-----------------+-------------+--------------+-------------+-------------------+-----------------+------------------------+-----------------------+------------------+-----------------+------------------+-------------------+----------------------+-----------------------+---------------+-------------+-----------------------+--------------------+-------------------+------------------+------------------+--------------+---------------------------+--------------------------+--------------------------+-----------------------------+--------------------+----------------------------+-------------------------+-------------------------+----------------

In [48]:
df_with_avg = df_with_avg.withColumn(
    'non_missing_count',
    sum([F.when(F.col(c).isNotNull(), 1).otherwise(0) for c in rankscore_columns])
)

# Show the results with the count of non-missing values
df_with_avg.select('aaref', 'avg_rankscore', 'non_missing_count').show()


+-----+--------------------+-----------------+
|aaref|       avg_rankscore|non_missing_count|
+-----+--------------------+-----------------+
|    M| 0.04117350877192984|               57|
|    M| 0.04025929824561404|               57|
|    M| 0.04101175438596492|               57|
|    M| 0.04809070175438597|               57|
|    M| 0.04925754385964913|               57|
|    M|0.049146315789473684|               57|
|    M| 0.04384894736842105|               57|
|    M| 0.04403140350877193|               57|
|    M| 0.04333719298245615|               57|
|    K| 0.03364456140350878|               57|
|    K| 0.03210456140350878|               57|
|    K| 0.02237421052631579|               57|
|    K|0.041771578947368423|               57|
|    K| 0.03874877192982456|               57|
|    K| 0.04367245614035088|               57|
|    K|0.024340175438596495|               57|
|    K|0.024109473684210524|               57|
|    K| 0.02734456140350877|               57|
|    K|0.0216

## FOR sql

Read in dataset contained in the file /data/datasets/prog5/SQL_example.csv with Python. [3 pts]

Decide how you want to design a Normalized table design and which datatypes you will need for the columns. CREATE the tables you need (again: use sqlalchemy for this!) [5 pts]

INSERT the data from the SQL_example file into the SQLite database. [7 pts]

Using a SQL SELECT query, find the maximum weight of a product being recalled. [5 pts]

#### Write the csv to sql database

### connect mysql databse

In [70]:
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, Float, String, ForeignKey, text, MetaData
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.engine.url import URL

# Read configuration
with open('/homes/zhe/my.cnf', 'r') as file:
    next(file)
    config = dict(line.strip().split('=') for line in file)

username = config['user']
password = config['password']
host = "mariadb.bin.bioinf.nl"
database = config['database']

# Create the SQLAlchemy connection string
connection_string = f"mysql+pymysql://{username}:{password}@{host}/{database}"
engine = create_engine(connection_string)

#### query way to make FSIS table

In [None]:
def create_sample_table(engine):
    create_table_query = text("""
    CREATE TABLE IF NOT EXISTS FSIS (
        Recall_Number VARCHAR(10) PRIMARY KEY,
        Recall_Date VARCHAR(20),
        Recall_Class VARCHAR(10),
        Product VARCHAR(255),
        Reason_for_Recall VARCHAR(255),
        Pounds_Recalled FLOAT
    )
    """)
    
    with engine.connect() as connection:
        connection.execute(create_table_query)
        connection.commit()

# Call this function after creating the engine
create_sample_table(engine)

def insert_data(connection, fsis_data):
    insert_query = text("""
    INSERT INTO FSIS (Recall_Number, Recall_Date, Recall_Class, Product, Reason_for_Recall, Pounds_Recalled)
    VALUES (:Recall_Number, :Recall_Date, :Recall_Class, :Product, :Reason_for_Recall, :Pounds_Recalled)
    ON DUPLICATE KEY UPDATE
    Recall_Date = :Recall_Date,
    Recall_Class = :Recall_Class,
    Product = :Product,
    Reason_for_Recall = :Reason_for_Recall,
    Pounds_Recalled = :Pounds_Recalled
    """)
    
    try:
        connection.execute(insert_query, {
            'Recall_Number': str(fsis_data['Recall Number']),
            'Recall_Date': str(fsis_data['Recall Date']),
            'Recall_Class': str(fsis_data['Recall Class']),
            'Product': str(fsis_data['Product'])[:255],
            'Reason_for_Recall': str(fsis_data['Reason for Recall'])[:255],
            'Pounds_Recalled': float(fsis_data['Pounds Recalled'])
        })
        connection.commit()
    except Exception as error:
        print(f"Error inserting data: {str(error)}")
        
try:
    filepath = '/homes/zhe/Desktop/programming/p5/exam/SQL_example.csv'
    df = pd.read_csv(filepath, skiprows=1)
    ### besause the cloumn looks like 333,470 , so we have to make it into float, then will get the max values
    df['Pounds Recalled'] = df['Pounds Recalled'].str.replace(',','').astype(float)
    # if there are Nan values at my df, replace it by None,because a sql. it is not allowed to use Nan
    # Replace NaN values with None (to be compatible with SQL)
    df = df.where(pd.notna(df), None)


    
    
    print("CSV Column Names:")
    print(df.columns.tolist())
    
    with engine.connect() as connection:
        for _, row in df.iterrows():
            print(f"Inserting data for Recall Number: {row['Recall Number']}")
            insert_data(connection, row.to_dict())
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    if 'connection' in locals():
        connection.close()



CSV Column Names:
['Recall Date', 'Recall Number', 'Recall Class', 'Product', 'Reason for Recall', 'Pounds Recalled']
Inserting data for Recall Number: 001-2014
Inserting data for Recall Number: 002-2014
Inserting data for Recall Number: 003-2014
Inserting data for Recall Number: 004-2014
Inserting data for Recall Number: 005-2014
Inserting data for Recall Number: 006-2014
Inserting data for Recall Number: 007-2014
Inserting data for Recall Number: 008-2014
Inserting data for Recall Number: 009-2014
Inserting data for Recall Number: 010-2014
Inserting data for Recall Number: 011-2014
Inserting data for Recall Number: 012-2014
Inserting data for Recall Number: 013-2014
Inserting data for Recall Number: 014-2014
Inserting data for Recall Number: 015-2014
Inserting data for Recall Number: 016-2014
Inserting data for Recall Number: 017-2014
Inserting data for Recall Number: 018-2014
Inserting data for Recall Number: 019-2014
Inserting data for Recall Number: 020-2014
Inserting data for Rec

#### Base way

In [72]:
Base = declarative_base()

class FSIS(Base):
    __tablename__ = 'FSIS'
    Recall_Number = Column(String(10), primary_key=True)
    Recall_Date = Column(String(20))
    Recall_Class = Column(String(10))
    Product = Column(String(255))
    Reason_for_Recall = Column(String(255))
    Pounds_Recalled = Column(Float)
# Create the table
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

def insert_data(session, fsis_data):
    try:
        with session.begin():
            fsis = FSIS(
                Recall_Number=str(fsis_data['Recall Number']),
                Recall_Date=str(fsis_data['Recall Date']),
                Recall_Class=str(fsis_data['Recall Class']),
                Product=str(fsis_data['Product'])[:255],
                Reason_for_Recall=str(fsis_data['Reason for Recall'])[:255],
                Pounds_Recalled=float(fsis_data['Pounds Recalled'])
            )
            session.merge(fsis)
        session.commit()
    except SQLAlchemyError as error:
        session.rollback()
        print(f"Error inserting data: {str(error)}")

try:
    filepath = '/homes/zhe/Desktop/programming/p5/exam/FSIS-Recall-Summary-2014.csv'
    df = pd.read_csv(filepath,skiprows=1)
    df['Pounds Recalled'] = df['Pounds Recalled'].str.replace(',','').astype(float)
    # Replace NaN values with None (to be compatible with SQL)
    df = df.where(pd.notna(df), None)

    
    
    print("CSV Column Names:")
    print(df.columns.tolist())
    
    for _, row in df.iterrows():
        print(f"Inserting data for Recall Number: {row['Recall Number']}")
        insert_data(session, row.to_dict())
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    session.close()

CSV Column Names:
['Recall Date', 'Recall Number', 'Recall Class', 'Product', 'Reason for Recall', 'Pounds Recalled']
Inserting data for Recall Number: 001-2014
Inserting data for Recall Number: 002-2014
Inserting data for Recall Number: 003-2014
Inserting data for Recall Number: 004-2014
Inserting data for Recall Number: 005-2014
Inserting data for Recall Number: 006-2014
Inserting data for Recall Number: 007-2014
Inserting data for Recall Number: 008-2014
Inserting data for Recall Number: 009-2014
Inserting data for Recall Number: 010-2014
Inserting data for Recall Number: 011-2014
Inserting data for Recall Number: 012-2014
Inserting data for Recall Number: 013-2014
Inserting data for Recall Number: 014-2014
Inserting data for Recall Number: 015-2014
Inserting data for Recall Number: 016-2014
Inserting data for Recall Number: 017-2014
Inserting data for Recall Number: 018-2014
Inserting data for Recall Number: 019-2014
Inserting data for Recall Number: 020-2014
Inserting data for Rec

In [73]:
from sqlalchemy import func

max_weight = session.query(func.max(FSIS.Pounds_Recalled)).scalar()
print(f"The maximum weight recalled is: {max_weight}")


The maximum weight recalled is: 8742700.0


SELECT MAX(`Pounds_Recalled`) AS Max_Weight_Recalled

FROM FSIS;


### MPI