In [1]:
from pyspark.sql import SparkSession

In [2]:
# Create a new Spark session with GCS connector
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GCSWriter") \
    .master("local[*]") \
    .config("spark.jars.packages", 
            "com.google.cloud:google-cloud-storage:2.24.0," +
            "org.apache.hadoop:hadoop-gcp:3.3.4," +
            "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.16") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "false") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .getOrCreate()

# You'll need to reload your data after restarting Spark
# For example:
# df = spark.read.parquet("../../output/events")

# Once data is loaded, get OAuth token again
import subprocess
token = subprocess.check_output('gcloud auth print-access-token', shell=True).decode('utf-8').strip()
spark.conf.set("spark.hadoop.google.cloud.auth.access.token", token)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/04 20:27:12 WARN Utils: Your hostname, Aarons-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.111 instead (on interface en0)
25/06/04 20:27:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/aaronginder/Git/event-store/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/aaronginder/.ivy2.5.2/cache
The jars for the packages stored in: /Users/aaronginder/.ivy2.5.2/jars
com.google.cloud#google-cloud-storage added as a dependency
org.apache.hadoop#hadoop-gcp added as a dependency
com.google.cloud.bigdataoss#gcs-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a5d8dfb8-385a-4bbf-951f-3a80bc3f6d57;1.0
	confs: [default]
	found com.google.cloud#google-cloud-storage;2.24.0 in ce

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# First, let's print the absolute path to help debug
import os
print(f"Current working directory: {os.getcwd()}")

# Then read the parquet files correctly
output_path = "../../output/events"  # Adjust this to your actual path
abs_path = os.path.abspath(output_path)
print(f"Attempting to read from: {abs_path}")

# Read the parquet files (no header or inferSchema needed)
df = spark.read.parquet(output_path)

# Check if data was read successfully
print(f"Number of rows: {df.count()}")
print("Schema:")
df.printSchema()
print("Sample data:")
df.show(5, truncate=False)

In [None]:
df.show()
df.describe().show()
print(df.count())

In [None]:
from pyspark.sql.functions import col, date_format

# Add partition columns
df = df.withColumn("year", date_format(col("event_timestamp"), "yyyy")) \
    .withColumn("month", date_format(col("event_timestamp"), "MM")) \
    .withColumn("day", date_format(col("event_timestamp"), "dd"))

In [None]:
# df.write.mode("overwrite").partitionBy("year", "month", "day")\
#     .parquet("../data/")

In [None]:
from pyspark.sql.functions import col, date_format, to_date

# Configure GCS access using your OAuth credentials
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "false")
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# If you need to explicitly specify credentials (usually not needed if you've run gcloud auth login)
# This uses credentials from gcloud CLI
import subprocess
token = subprocess.check_output('gcloud auth print-access-token', shell=True).decode('utf-8').strip()
spark.conf.set("spark.hadoop.google.cloud.auth.access.token", token)

# Ensure you have a date column
if "event_timestamp" in df.columns:
    df = df.withColumn("Date", to_date("event_timestamp"))

# Add partition columns
df = df.withColumn("year", date_format(col("Date"), "yyyy")) \
    .withColumn("month", date_format(col("Date"), "MM")) \
    .withColumn("day", date_format(col("Date"), "dd"))

# Define GCS output path with your bucket
output_path = "gs://aaronginder-sample-datasets/events"

# Write partitioned Parquet with Snappy compression
df.write \
    .partitionBy("year", "month", "day") \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .parquet(output_path)

print(f"Data written to: {output_path}")

In [None]:
spark.stop()