In [9]:
#pip install google-cloud-bigquery

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, trim, lower,coalesce,lit

spark = SparkSession.builder.appName("StudentsMigrationAnalysis").getOrCreate()

schema1 = StructType([
    StructField("student_id", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("destination_country", StringType(), True),
    StructField("destination_city", StringType(), True),
    StructField("university_name", StringType(), True),
    StructField("course_name", StringType(), True),
    StructField("field_of_study", StringType(), True),
    StructField("year_of_enrollment", IntegerType(), True),
    StructField("scholarship_received", StringType(), True),
    StructField("enrollment_reason", StringType(), True),
    StructField("graduation_year", IntegerType(), True),
    StructField("placement_status", StringType(), True),
    StructField("placement_country", StringType(), True),
    StructField("placement_company", StringType(), True),
    StructField("starting_salary_usd", IntegerType(), True),
    StructField("gpa_or_score", DoubleType(), True),
    StructField("visa_status", StringType(), True),
    StructField("post_graduation_visa", StringType(), True),
    StructField("language_proficiency_test", StringType(), True),
    StructField("test_score", DoubleType(), True)
])

df = spark.read.schema(schema1).csv("C:/Users/Tejar/Downloads/global_student_migration.csv", header=True, inferSchema=True)


df = df.replace(["N/A", "None"], None)
df = df.dropDuplicates()
numeric_cols = ["starting_salary_usd", "gpa_or_score", "test_score"]
df = df.na.fill(0, subset=numeric_cols)


df = df.withColumn("placement_company", coalesce(df.placement_company, lit("Unknown")))

df.printSchema()
df.show()


root
 |-- student_id: string (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- destination_country: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- university_name: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- field_of_study: string (nullable = true)
 |-- year_of_enrollment: integer (nullable = true)
 |-- scholarship_received: string (nullable = true)
 |-- enrollment_reason: string (nullable = true)
 |-- graduation_year: integer (nullable = true)
 |-- placement_status: string (nullable = true)
 |-- placement_country: string (nullable = true)
 |-- placement_company: string (nullable = false)
 |-- starting_salary_usd: integer (nullable = true)
 |-- gpa_or_score: double (nullable = false)
 |-- visa_status: string (nullable = true)
 |-- post_graduation_visa: string (nullable = true)
 |-- language_proficiency_test: string (nullable = true)
 |-- test_score: double (nullable = false)

+----------+--------------+-----

In [6]:
df.write.mode("overwrite").csv("cleanedmigrationdata")

In [8]:
import os
from google.cloud import bigquery


os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "C:/Users/Tejar/Downloads/enduring-victor-468209-a4-3d960056faee.json"


client = bigquery.Client()

project_id = "enduring-victor-468209-a4"
dataset_id = "Student_Migration"
table_id = "migration"
csv_file_path = "C:/Users/Tejar/pyspark_practice/cleanedmigrationdata/part-00000-c0b0d640-f641-410a-a846-605f82f953b6-c000.csv"

table_ref = f"{project_id}.{dataset_id}.{table_id}"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,      
    autodetect=True,           
    write_disposition="WRITE_TRUNCATE")

with open(csv_file_path, "rb") as source_file:
    load_job = client.load_table_from_file(
        source_file,
        destination=table_ref,
        job_config=job_config,
    )


load_job.result()

destination_table = client.get_table(table_ref)
print(f"Loaded {destination_table.num_rows} rows into {table_ref}.") 

Loaded 4999 rows into enduring-victor-468209-a4.Student_Migration.migration.
