In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, to_date, lower, regexp_replace
from pyspark.sql.types import IntegerType, DoubleType, DateType
spark = SparkSession.builder.appName("LungCancerDataProcessing").getOrCreate()
df = spark.read.csv("/content/Lung Cancer.csv",header=True,inferSchema=True)
print("Original DataFrame Schema:")
df.printSchema()
print("\nOriginal DataFrame Sample Data:")
df.show(5, truncate=False)
new_column_names = [lower(regexp_replace(c, " ", "_")).alias(c)for c in df.columns]
df = df.select(*new_column_names)
print("\nDataFrame after standardizing column names:")
df.show(5, truncate=False)
df_no_duplicates = df.dropDuplicates()
df_cleaned = df_no_duplicates.na.drop()
print(f"\nOriginal row count: {df.count()}")
print(f"Row count after removing duplicates and nulls: {df_cleaned.count()}")
df_transformed = df_cleaned.withColumn("diagnosis_date",to_date(col("diagnosis_date"), "yyyy-MM-dd")).withColumn("end_treatment_date",to_date(col("end_treatment_date"), "yyyy-MM-dd")).withColumn("id",
    col("id").cast(IntegerType())
).withColumn(
    "age",
    col("age").cast(IntegerType())
).withColumn(
    "bmi",
    col("bmi").cast(DoubleType())
).withColumn(
    "cholesterol_level",
    col("cholesterol_level").cast(DoubleType())
)
df_final = df_transformed.withColumn(
    "diagnosis_year",
    year(col("diagnosis_date"))
).withColumn(
    "diagnosis_month",
    month(col("diagnosis_date"))
).withColumn(
    "diagnosis_day",
    dayofmonth(col("diagnosis_date")))

print("\nFinal Transformed DataFrame Schema:")
df_final.printSchema()
print("\nFinal Transformed DataFrame Sample Data:")
df_final.show(5, truncate=False)
spark.stop()
import os
from google.cloud import bigquery
from google.colab import auth
auth.authenticate_user()

client = bigquery.Client()

project_id = "  mystic-pagoda-468209-a2"
dataset_id = "mystic-pagoda-468209-a2.venky12"
table_id = "mystic-pagoda-468209-a2.venky12.venky14"
csv_file_path = "/content/Lung Cancer.csv"

table_ref = f"{project_id}.{dataset_id}.{table_id}"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True,
    write_disposition="WRITE_TRUNCATE")

with open("/content/Lung Cancer.csv", "rb") as source_file:
    load_job = client.load_table_from_file(
        source_file,
        destination=table_ref,
        job_config=job_config,
    )

load_job.result()

destination_table = client.get_table(table_ref)
print(f"Loaded {destination_table.num_rows} rows into {table_ref}.")

Original DataFrame Schema:
root
 |-- id: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)
 |-- cancer_stage: string (nullable = true)
 |-- family_history: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- cholesterol_level: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- cirrhosis: integer (nullable = true)
 |-- other_cancer: integer (nullable = true)
 |-- treatment_type: string (nullable = true)
 |-- end_treatment_date: date (nullable = true)
 |-- survived: integer (nullable = true)


Original DataFrame Sample Data:
+---+----+------+-----------+--------------+------------+--------------+--------------+----+-----------------+------------+------+---------+------------+--------------+------------------+--------+
|id |age |gender|co