In [1]:
from google.colab import files
uploaded = files.upload()

Saving archive.zip to archive.zip


In [2]:
!pip install pyspark



In [3]:
import zipfile
import os

# Get the uploaded file name
zip_filename = list(uploaded.keys())[0]

# Extract the ZIP file
with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
    zip_ref.extractall("extracted_data")

# List extracted files
os.listdir("extracted_data")

['surv_variants.csv']

In [4]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("ZipFileAnalysis").getOrCreate()

# Find the extracted CSV file
csv_files = [f for f in os.listdir("extracted_data") if f.endswith(".csv")]
csv_path = f"extracted_data/{csv_files[0]}"  # Load the first CSV file

# Load CSV file into PySpark DataFrame
spark_df = spark.read.csv(csv_path, header=True, inferSchema=True)

# Show first few rows
spark_df.show(5)

+---------+----------+--------+----------+-------+------------+--------+--------+--------------------+------------------+------------------+------------------+
|  Country| first_seq|num_seqs|  last_seq|variant|censure_date|duration|censored|      mortality_rate|       total_cases|      total_deaths|       growth_rate|
+---------+----------+--------+----------+-------+------------+--------+--------+--------------------+------------------+------------------+------------------+
|    China|2019-10-22|       3|2020-06-03| S.Q677|  2020-06-21|     225|    true| 0.05298270465397884|18259.460122699387| 967.4355828220858| 16447.43070611852|
|      USA|2020-03-03|   26022|2021-11-19| S.Q677|  2020-11-01|     626|   false|0.016111465270624598|256577.59623403364| 4133.841030944974| 84748.74587601596|
|   Brazil|2020-03-09|    1553|2021-11-12| S.Q677|  2020-11-07|     613|   false|0.027845990904288728| 101550.1404656567|2827.7642877359194| 42356.90742557735|
|Australia|2020-03-20|      88|2021-11-1

In [5]:
spark_df.describe().show()

+-------+-----------+-----------------+-------+------------------+--------------------+--------------------+------------------+------------------+
|summary|    Country|         num_seqs|variant|          duration|      mortality_rate|         total_cases|      total_deaths|       growth_rate|
+-------+-----------+-----------------+-------+------------------+--------------------+--------------------+------------------+------------------+
|  count|       4113|             4113|   4113|              4113|                4113|                4113|              4113|              3585|
|   mean|       NULL|4216.427911500122|   NULL|182.55263797714562|0.019360191245763343|  119831.57411797598|2118.2666133071757|11682.192503622668|
| stddev|       NULL|39719.29351739689|   NULL|161.95094826351848|0.014504036853558729|  1027564.1473311749|15801.688542051683| 41524.51381621186|
|    min|Afghanistan|                1|20A.EU1|                 0|                 0.0|2.015925813930047...|          

In [6]:
from pyspark.sql.functions import col, count

missing_values = spark_df.select([count(col(c).isNull().cast("int")).alias(c) for c in spark_df.columns])
missing_values.show()


+-------+---------+--------+--------+-------+------------+--------+--------+--------------+-----------+------------+-----------+
|Country|first_seq|num_seqs|last_seq|variant|censure_date|duration|censored|mortality_rate|total_cases|total_deaths|growth_rate|
+-------+---------+--------+--------+-------+------------+--------+--------+--------------+-----------+------------+-----------+
|   4113|     4113|    4113|    4113|   4113|        4113|    4113|    4113|          4113|       4113|        4113|       4113|
+-------+---------+--------+--------+-------+------------+--------+--------+--------------+-----------+------------+-----------+



In [7]:
spark_df.select("Country").distinct().show()

+-------------+
|      Country|
+-------------+
|Côte d'Ivoire|
|       Russia|
|     Paraguay|
|      Senegal|
|       Sweden|
|   Cabo Verde|
|       Guyana|
|  Philippines|
|     Djibouti|
|     Malaysia|
|    Singapore|
|         Fiji|
|       Turkey|
|       Malawi|
|         Iraq|
|      Germany|
|      Comoros|
|  Afghanistan|
|     Cambodia|
|       Rwanda|
+-------------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import col

spark_df.groupBy("num_seqs").count().orderBy(col("count").desc()).show()

+--------+-----+
|num_seqs|count|
+--------+-----+
|       1|  480|
|       2|  240|
|       3|  182|
|       4|  148|
|       5|  118|
|       6|   91|
|       8|   76|
|       7|   75|
|      10|   57|
|       9|   48|
|      11|   47|
|      12|   42|
|      13|   37|
|      14|   36|
|      19|   34|
|      15|   34|
|      22|   29|
|      16|   28|
|      28|   26|
|      21|   26|
+--------+-----+
only showing top 20 rows



In [13]:
spark_df = spark_df.drop("first_seq", "last_seq")
spark_df.show(5)


+---------+--------+-------+------------+--------+--------+--------------------+------------------+------------------+------------------+
|  Country|num_seqs|variant|censure_date|duration|censored|      mortality_rate|       total_cases|      total_deaths|       growth_rate|
+---------+--------+-------+------------+--------+--------+--------------------+------------------+------------------+------------------+
|    China|       3| S.Q677|  2020-06-21|     225|    true| 0.05298270465397884|18259.460122699387| 967.4355828220858| 16447.43070611852|
|      USA|   26022| S.Q677|  2020-11-01|     626|   false|0.016111465270624598|256577.59623403364| 4133.841030944974| 84748.74587601596|
|   Brazil|    1553| S.Q677|  2020-11-07|     613|   false|0.027845990904288728| 101550.1404656567|2827.7642877359194| 42356.90742557735|
|Australia|      88| S.Q677|  2020-11-18|     604|   false|0.009905405476304151| 1402.239578652566|13.889751601475552|1227.1224997033116|
|   Sweden|     810| S.Q677|  2020

In [14]:
print(f"Total rows before removing duplicates: {spark_df.count()}")

spark_df = spark_df.dropDuplicates()

print(f"Total rows after removing duplicates: {spark_df.count()}")


Total rows before removing duplicates: 4113
Total rows after removing duplicates: 4113


In [16]:
csv_path = "output"

# Overwrite existing files
spark_df.coalesce(1).write.mode("overwrite").csv(csv_path, header=True)



In [17]:
import os
import shutil

# Define directory where PySpark saved the files
csv_folder = "output"

# Find the actual CSV file inside the folder
files = os.listdir(csv_folder)
csv_file = [f for f in files if f.startswith("part")][0]  # Find 'part' file

# Rename it to output.csv
shutil.move(f"{csv_folder}/{csv_file}", "output.csv")


'output.csv'

In [18]:
from google.colab import files
files.download("output.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>