In [3]:
import os
from google.cloud import bigquery
from pyspark.sql import SparkSession

# Set Google Cloud credentials (Path to your service account JSON file)
# Set Google Cloud authentication with container path
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/app/credentials.json"

# Set Google Cloud authentication with local host
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "big-test-449715-2b0e9010365e.json"

# Initialize BigQuery client
client = bigquery.Client()

# List datasets in your project
datasets = list(client.list_datasets())

if datasets:
    print("BigQuery authentication successful! Datasets found:")
    for dataset in datasets:
        print(f"- {dataset.dataset_id}")
else:
    print("No datasets found, but authentication is working.")

# Define Google Cloud Project ID
PROJECT_ID = "big-test-449715"
DATASET_ID = "LLM"
TABLE_NAME = "review_amazon"

# Initialize Spark session with BigQuery support
spark = SparkSession.builder \
    .appName("AmazonReviewProcessing") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.2") \
    .config("parentProject", PROJECT_ID) \
    .getOrCreate()

print("PySpark Session Initialized!")

# Retrieve data from BigQuery into Spark DataFrame
df = spark.read.format("bigquery") \
    .option("table", f"{PROJECT_ID}.{DATASET_ID}.{TABLE_NAME}") \
    .option("credentialsFile", "big-test-449715-2b0e9010365e.json").load()

df.show(1000)

# Step 2: Preprocess Data
df_cleaned = df.withColumn("review_text", lower(col("review_text")))  # Convert text to lowercase
df_cleaned = df_cleaned.withColumn("review_text", regexp_replace(col("review_text"), "[^a-zA-Z0-9 ]", ""))  # Remove special chars
df_cleaned = df_cleaned.dropna()  # Remove missing values

print("✅ Data Preprocessing Done!")

# Step 3: Write Processed Data Back to BigQuery
df_cleaned.write.format("bigquery") \
    .option("table", f"{PROJECT_ID}.{DATASET_ID}.{DESTINATION_TABLE}") \
    .option("credentialsFile", "big-test-449715-2b0e9010365e.json") \
    .mode("overwrite") \
    .save()

print("✅ Processed Data Saved to BigQuery!")

# Stop Spark Session
spark.stop()
print("✅ Spark Session Stopped!")

BigQuery authentication successful! Datasets found:
- LLM
PySpark Session Initialized!
+------+--------------------+--------------------+
|rating|               title|              review|
+------+--------------------+--------------------+
|     1|Batteries died wi...|I bought this cha...|
|     1|DVD Player crappe...|I also began havi...|
|     1|      Incorrect Disc|I love the style ...|
|     1|DVD menu select p...|I cannot scroll t...|
|     1|Not an "ultimate ...|Firstly,I enjoyed...|
|     1|                Not!|If you want to li...|
|     1|     A complete Bust|This game require...|
|     1|didn't run off of...|Was hoping that t...|
|     1|          Don't buy!|First of all, the...|
|     1|     Long and boring|I've read this bo...|
|     1|        Dont like it|This product smel...|
|     1|Don't Take the Ch...|If you purchase t...|
|     1|     Waste of money!|Like many of the ...|
|     1|        Has No Range|I suppose if you ...|
|     1|Three Days of Use...|Very disappointed

NameError: name 'lower' is not defined

In [7]:
import os
from google.cloud import bigquery
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, col

# Set Google Cloud credentials (Path to your service account JSON file)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "big-test-449715-2b0e9010365e.json"

# Initialize BigQuery client
client = bigquery.Client()

# Define Google Cloud Project ID and Dataset
PROJECT_ID = "big-test-449715"
DATASET_ID = "LLM"
TABLE_NAME = "review_amazon"
DESTINATION_TABLE = "preprocessed_dataset"  # Destination table in BigQuery

# List datasets in your project (just to confirm connection)
datasets = list(client.list_datasets())
if datasets:
    print("BigQuery authentication successful! Datasets found:")
    for dataset in datasets:
        print(f"- {dataset.dataset_id}")
else:
    print("No datasets found, but authentication is working.")

# Initialize Spark session with BigQuery support
spark = SparkSession.builder \
    .appName("AmazonReviewProcessing") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.2") \
    .config("parentProject", PROJECT_ID) \
    .getOrCreate()

print("✅ PySpark Session Initialized!")

# Retrieve data from BigQuery into Spark DataFrame (limiting to 1000 records)
df = spark.read.format("bigquery") \
    .option("table", f"{PROJECT_ID}.{DATASET_ID}.{TABLE_NAME}") \
    .option("credentialsFile", "big-test-449715-2b0e9010365e.json").load()

# Limit to 1000 records
df_limited = df.limit(1000)

print("✅ Data Loaded from BigQuery (Limited to 1000 Records)!")

# Step 2: Preprocess Data
# Perform transformations such as converting text to lowercase, removing special characters, and dropping missing values
df_cleaned = df_limited.withColumn("review", lower(col("review")))  # Convert text to lowercase
df_cleaned = df_cleaned.withColumn("review", regexp_replace(col("review"), "[^a-zA-Z0-9 ]", ""))  # Remove special chars
df_cleaned = df_cleaned.dropna()  # Remove missing values

print("✅ Data Preprocessing Done!")

# Step 3: Write Processed Data Back to BigQuery
df_cleaned.write.format("bigquery") \
    .option("table", f"{PROJECT_ID}.{DATASET_ID}.{DESTINATION_TABLE}") \
    .option("credentialsFile", "big-test-449715-2b0e9010365e.json") \
    .mode("overwrite") \
    .save()

print("✅ Processed Data Saved to BigQuery!")

# Stop Spark session
spark.stop()


BigQuery authentication successful! Datasets found:
- LLM
✅ PySpark Session Initialized!
✅ Data Loaded from BigQuery (Limited to 1000 Records)!
✅ Data Preprocessing Done!


IllegalArgumentException: Either temporary or persistent GCS bucket must be set

In [None]:
import os
from google.cloud import bigquery
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, col

# Set Google Cloud credentials (Path to your service account JSON file)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "big-test-449715-2b0e9010365e.json"
os.environ["GOOGLE_CLOUD_STORAGE_BUCKET"] = "your-bucket-name"  # Set your GCS bucket name

# Initialize BigQuery client
client = bigquery.Client()

# Define Google Cloud Project ID and Dataset
PROJECT_ID = "big-test-449715"
DATASET_ID = "LLM"
TABLE_NAME = "review_amazon"
DESTINATION_TABLE = "preprocessed_dataset"  # Destination table in BigQuery

# List datasets in your project (just to confirm connection)
datasets = list(client.list_datasets())
if datasets:
    print("BigQuery authentication successful! Datasets found:")
    for dataset in datasets:
        print(f"- {dataset.dataset_id}")
else:
    print("No datasets found, but authentication is working.")

# Initialize Spark session with BigQuery support
spark = SparkSession.builder \
    .appName("AmazonReviewProcessing") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.2") \
    .config("parentProject", PROJECT_ID) \
    .config("spark.hadoop.fs.gs.bucket", "your-bucket-name") \  # Add your GCS bucket name here
    .getOrCreate()

print("✅ PySpark Session Initialized!")

# Retrieve data from BigQuery into Spark DataFrame (limiting to 1000 records)
df = spark.read.format("bigquery") \
    .option("table", f"{PROJECT_ID}.{DATASET_ID}.{TABLE_NAME}") \
    .option("credentialsFile", "big-test-449715-2b0e9010365e.json").load()

# Limit to 1000 records
df_limited = df.limit(1000)

print("✅ Data Loaded from BigQuery (Limited to 1000 Records)!")

# Step 2: Preprocess Data
# Perform transformations such as converting text to lowercase, removing special characters, and dropping missing values
df_cleaned = df_limited.withColumn("review_text", lower(col("review_text")))  # Convert text to lowercase
df_cleaned = df_cleaned.withColumn("review_text", regexp_replace(col("review_text"), "[^a-zA-Z0-9 ]", ""))  # Remove special chars
df_cleaned = df_cleaned.dropna()  # Remove missing values

print("✅ Data Preprocessing Done!")

# Step 3: Write Processed Data Back to BigQuery
df_cleaned.write.format("bigquery") \
    .option("table", f"{PROJECT_ID}.{DATASET_ID}.{DESTINATION_TABLE}") \
    .option("credentialsFile", "big-test-449715-2b0e9010365e.json") \
    .mode("overwrite") \
    .save()

print("✅ Processed Data Saved to BigQuery!")

# Stop Spark session
spark.stop()
