In [35]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

# Set up your Spark session
spark = SparkSession.builder.appName("ReadParquetFromCloudStorage").config("spark.jars", "gs://data-bucket420/gcs-connector-hadoop2-2.1.1.jar").getOrCreate()
# ~/spark/spark-packages/gcs-connector-hadoop2-latest.jar
# Replace with your Google Cloud Storage project ID
project_id = "playground-s-11-47dcb5d9"

# Replace with the path to your Parquet file in cloud storage (including bucket name)
parquet_file_path = "gs://data-bucket420/FactStoreBudget_sample.snappy.parquet"

# Optional: Specify whether the file has a header row (if not, set `header=False`)
header_present = True

# Read the Parquet file into a DataFrame
df = spark.read \
    .format("parquet") \
    .option("header", header_present) \
    .load(parquet_file_path)

# Print the DataFrame schema to get an overview of the data
print("DataFrame schema:")
# df.printSchema()




24/02/15 15:30:07 INFO SparkEnv: Registering MapOutputTracker
24/02/15 15:30:07 INFO SparkEnv: Registering BlockManagerMaster
24/02/15 15:30:07 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/02/15 15:30:07 INFO SparkEnv: Registering OutputCommitCoordinator
[Stage 0:>                                                          (0 + 1) / 1]

DataFrame schema:


                                                                                

In [34]:
spark.stop()


In [36]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-------+------------+--------+--------+--------------+--------------------+
|StoreId|BusinessDate|   Sales|  Margin|SourceSystemId|     sysDateModified|
+-------+------------+--------+--------+--------------+--------------------+
| 151001|  2021-03-04| 2154.61|  901.96|             1|2022-01-05 07:50:...|
| 119142|  2021-01-15| 2568.12| 1119.65|             1|2022-01-05 07:50:...|
| 025099|  2021-02-03| 4135.94| 1769.75|             1|2022-01-05 07:50:...|
| 092005|  2020-03-01|     0.0|     0.0|             1|2022-01-05 07:50:...|
| 076004|  2020-08-02| 1700.02|  841.85|             1|2022-01-05 07:50:...|
| 001041|  2020-08-21| 4931.03|  1794.4|             1|2022-01-05 07:50:...|
| 001155|  2021-10-19| 4753.36| 1884.54|             1|2022-01-05 07:50:...|
| 066012|  2020-11-12| 5484.45| 2284.25|             1|2022-01-05 07:50:...|
| 166013|  2020-04-11| 3829.36| 1740.44|             1|2022-01-05 07:50:...|
| 513009|  2020-04-03| 2100.82|  824.78|             1|2022-01-05 07:50:...|

                                                                                

In [37]:
spark.catalog.listTables()

[]

In [38]:
df.createOrReplaceTempView("Sales") 

In [39]:
result = spark.sql("Select * from sales;")

In [40]:
result.show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------+------------+--------+--------+--------------+--------------------+
|StoreId|BusinessDate|   Sales|  Margin|SourceSystemId|     sysDateModified|
+-------+------------+--------+--------+--------------+--------------------+
| 151001|  2021-03-04| 2154.61|  901.96|             1|2022-01-05 07:50:...|
| 119142|  2021-01-15| 2568.12| 1119.65|             1|2022-01-05 07:50:...|
| 025099|  2021-02-03| 4135.94| 1769.75|             1|2022-01-05 07:50:...|
| 092005|  2020-03-01|     0.0|     0.0|             1|2022-01-05 07:50:...|
| 076004|  2020-08-02| 1700.02|  841.85|             1|2022-01-05 07:50:...|
| 001041|  2020-08-21| 4931.03|  1794.4|             1|2022-01-05 07:50:...|
| 001155|  2021-10-19| 4753.36| 1884.54|             1|2022-01-05 07:50:...|
| 066012|  2020-11-12| 5484.45| 2284.25|             1|2022-01-05 07:50:...|
| 166013|  2020-04-11| 3829.36| 1740.44|             1|2022-01-05 07:50:...|
| 513009|  2020-04-03| 2100.82|  824.78|             1|2022-01-05 07:50:...|

                                                                                

In [41]:
bucket = "data-bucket420"
spark.conf.set('temporaryGcsBucket', bucket)

In [33]:
result.write.format('bigquery') \
    .option('table', 'sales.sales_data_complete') \
    .mode("append") \
    .save()

                                                                                