In [0]:
%spark.pyspark

# Define the GCS bucket and paths
raw_bucket = "cricket-raw"
processed_bucket = "cricket-processed" # Make sure this bucket exists

# We'll test with the T20 data first
input_path = f"gs://{raw_bucket}/t20_json/"
output_path = f"gs://{processed_bucket}/test_output"

print(f"Input path set to: {input_path}")
print(f"Output path set to: {output_path}")

In [1]:
%spark.pyspark

first_file_rdd = spark.sparkContext.wholeTextFiles(input_path).take(1)

# The result is a list containing one item: (filepath, file_content).
# We only need the content, which is the second element [1].
first_file_content = first_file_rdd[0][1]

# Step 2: Create a new RDD containing just the text of that single file
# and then create a DataFrame from it.
single_file_df = spark.read.option("multiLine", "true").json(
    spark.sparkContext.parallelize([first_file_content])
)

# Step 3: Print the schema that Spark infers from just that one file.
print("Schema inferred by Spark from one file:")
single_file_df.printSchema()

In [2]:
%spark.pyspark

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType

# 1. Define the full schema for the data in GCS
defined_schema = StructType([
    StructField("city", StringType(), True),
    StructField("dates", ArrayType(StringType()), True),
    StructField("event", StructType([
        StructField("name", StringType(), True),
        StructField("match_number", LongType(), True)
    ]), True),
    StructField("gender", StringType(), True),
    StructField("match_type", StringType(), True),
    StructField("outcome", StructType([
        StructField("winner", StringType(), True),
        StructField("by", StructType([
            StructField("runs", LongType(), True),
            StructField("wickets", LongType(), True)
        ]), True)
    ]), True),
    StructField("overs", StringType(), True),
    StructField("player_of_match", ArrayType(StringType()), True),
    StructField("teams", ArrayType(StringType()), True),
    StructField("toss", StructType([
        StructField("decision", StringType(), True),
        StructField("winner", StringType(), True)
    ]), True),
    StructField("venue", StringType(), True)
])

# 2. Read the data using the schema
raw_df = spark.read.schema(defined_schema).option("multiLine", "true").json(input_path)
raw_df.cache()

# 3. Verify
print("Successfully loaded data with the full schema.")
raw_df.printSchema()

In [3]:
%spark.pyspark

from pyspark.sql.functions import col

# Select and flatten all the specified fields
final_df = raw_df.select(
    col("city"),
    col("dates")[0].alias("match_date"),
    col("event.name").alias("event_name"),
    col("gender"),
    col("match_type").alias("match_type_detail"),
    col("outcome.winner").alias("winner"),
    col("outcome.by.wickets").alias("win_by_wickets"),
    col("outcome.by.runs").alias("win_by_runs"),
    col("overs"),
    col("player_of_match")[0].alias("player_of_match"),
    col("teams")[0].alias("team_1"),
    col("teams")[1].alias("team_2"),
    col("toss.decision").alias("toss_decision"),
    col("toss.winner").alias("toss_winner"),
    col("venue")
)

# Create a temporary table for SQL queries
final_df.createOrReplaceTempView("cricket_view_final")

# Show the comprehensive, flattened result
print("Final transformed data sample:")
final_df.show(10)

In [4]:
%spark.pyspark

# Add the 'year' and 'match_type' columns for partitioning
partitioned_df = final_df.withColumn("year", get_year(col("match_date"))) \
                         .withColumn("match_type", lit("t20"))

# Write the final DataFrame to the GCS output path in Parquet format
print(f"Writing partitioned data to: {output_path}")


In [5]:
%spark.pyspark
print(f"partitioned_df columns: {partitioned_df.columns}")
print(f"partitioned_df count: {partitioned_df.count()}")

In [6]:
%spark.pyspark
# Write the final DataFrame to the GCS output path
print(f"Writing partitioned data to: {output_path}")
partitioned_df.write.partitionBy("match_type", "year").mode("overwrite").parquet(output_path)

print("\nWrite complete!")

In [7]:
%spark.pyspark
