In [17]:
pip install google-cloud-bigquery

Collecting google-cloud-bigquery
  Downloading google_cloud_bigquery-3.35.1-py3-none-any.whl.metadata (8.0 kB)
Collecting google-api-core<3.0.0,>=2.11.1 (from google-api-core[grpc]<3.0.0,>=2.11.1->google-cloud-bigquery)
  Downloading google_api_core-2.25.1-py3-none-any.whl.metadata (3.0 kB)
Collecting google-auth<3.0.0,>=2.14.1 (from google-cloud-bigquery)
  Downloading google_auth-2.40.3-py2.py3-none-any.whl.metadata (6.2 kB)
Collecting google-cloud-core<3.0.0,>=2.4.1 (from google-cloud-bigquery)
  Downloading google_cloud_core-2.4.3-py2.py3-none-any.whl.metadata (2.7 kB)
Collecting google-resumable-media<3.0.0,>=2.0.0 (from google-cloud-bigquery)
  Downloading google_resumable_media-2.7.2-py2.py3-none-any.whl.metadata (2.2 kB)
Collecting googleapis-common-protos<2.0.0,>=1.56.2 (from google-api-core<3.0.0,>=2.11.1->google-api-core[grpc]<3.0.0,>=2.11.1->google-cloud-bigquery)
  Downloading googleapis_common_protos-1.70.0-py3-none-any.whl.metadata (9.3 kB)
Collecting protobuf!=3.20.0,!=

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
# 1. Create SparkSession
spark = SparkSession.builder.appName("EcommerceDataset").getOrCreate()

# 2. Load the CSV file with header and schema inference


df = spark.read.csv("C:/Users/Vasavi kota/Downloads/Ecommerce.csv",header=True,inferSchema=True)

# 3. Show first few rows of data
df.show()

# 4. Print schema of the DataFrame
df.printSchema()
df = df.dropna(subset=["Product", "Order_Date"])

# Fill missing values in less important columns
df = df.fillna({
    "Customer_Id": "Unknown",
    "Quantity": 0,
    "Sales": 0.0,
    "Profit": 0.0,
    "Discount": 0.0,
    "Shipping_Cost": 0.0
})

# Standardize string columns to lowercase
for col_name in ["Gender", "Device_Type", "Customer_Login_type", "Product_Category", "Product", "Order_Priority", "Payment_method"]:
    df = df.withColumn(col_name, col(col_name).cast("string"))

# Remove duplicate rows if any
df = df.dropDuplicates()


#
# 11. Final check: Show cleaned data and schema
df.show(truncate=False)
df.printSchema()
# Cache the data in memory
df.cache()

# Or persist with MEMORY_AND_DISK (safer for larger data)
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

# Force action to trigger caching
df.count()

# Check if it's cached
print("Cached:", df.storageLevel.useMemory)

# Unpersist when done (optional)
df.unpersist()
print("Unpersisted:", df.storageLevel.useMemory)

df.write.mode("overwrite").csv("C:/Users/Vasavi kota/Downloads/Ecommerce.csv",header=True )


+----------+-------------------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|Order_Date|               Time|Aging|Customer_Id|Gender|Device_Type|Customer_Login_type|  Product_Category|             Product|Sales|Quantity|Discount|Profit|Shipping_Cost|Order_Priority|Payment_method|
+----------+-------------------+-----+-----------+------+-----------+-------------------+------------------+--------------------+-----+--------+--------+------+-------------+--------------+--------------+
|2018-01-02|2025-08-11 10:56:33|  8.0|      37077|Female|        Web|             Member|Auto & Accessories|   Car Media Players|140.0|     1.0|     0.3|  46.0|          4.6|        Medium|   credit_card|
|2018-07-24|2025-08-11 20:41:37|  2.0|      59173|Female|        Web|             Member|Auto & Accessories|        Car Speakers|211.0|     1.0|     0.3| 112.0|         11.2|      

In [1]:
from google.cloud import bigquery
import pandas as pd


In [3]:
import os
from google.cloud import bigquery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "C:/Users/Vasavi kota/Downloads/calcium-aria-468209-b4-e99031666590.json"

client = bigquery.Client()

project_id = "calcium-aria-468209-b4"
dataset_id = "Ecommerce"
table_id = "E-commerce"
csv_file_path = "C:/Users/Vasavi kota/Downloads/Ecommerce.csv"

table_ref = f"{project_id}.{dataset_id}.{table_id}"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,      
    autodetect=True,           
    write_disposition="WRITE_TRUNCATE")

with open("C:/Users/Vasavi kota/Downloads/Ecommerce.csv", "rb") as source_file:
    load_job = client.load_table_from_file(
        source_file,
        destination=table_ref,
        job_config=job_config,
    )

load_job.result()

destination_table = client.get_table(table_ref)
print(f"Loaded {destination_table.num_rows} rows into {table_ref}.") 


Loaded 51290 rows into calcium-aria-468209-b4.Ecommerce.E-commerce.
