In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

#   https://kashif-sohail.medium.com/read-files-from-google-cloud-storage-bucket-using-local-pyspark-and-jupyter-notebooks-f8bd43f4b42e

## Build Master

In [2]:
# Use spark gcs connector
conf = (SparkConf()
        .setMaster("spark://localhost:7077")
        .setAppName("GCSRead")
        .set("spark.jars", "./spark-jars/gcs-connector-hadoop3-latest.jar")
        .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
        .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "./infra/creds.json")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

24/04/11 16:51:48 WARN Utils: Your hostname, manuelpc resolves to a loopback address: 127.0.0.1; using 192.168.2.2 instead (on interface wlo1)
24/04/11 16:51:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/11 16:51:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
hadoop_conf = spark._jsc.hadoopConfiguration()

hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("google.cloud.auth.service.account.enable", "true")
hadoop_conf.set("google.cloud.auth.service.account.json.keyfile", "./infra/creds.json")

## Read DataFrame

In [5]:
bucket_name = "weather_data_de_bucket"
path=f"gs://{bucket_name}/Actuele10mindataKNMIstations/2/*"

df=spark.read.parquet(path)
df.show(5)

AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for CSV. It must be specified manually.

### Schema Read

In [None]:
# Enforce struct
from pyspark.sql import types

schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType()),
    types.StructField("pickup_datetime", types.TimestampType()),
    types.StructField("dropoff_datetime", types.TimestampType()),
    types.StructField("PULocationID", types.IntegerType()),
    types.StructField("DOLocationID", types.IntegerType()),
    types.StructField("SR_Flag", types.IntegerType(), True),
])

In [None]:
df = spark.read.csv("fhv_tripdata_2019-10.csv", header=True, schema=schema)

In [None]:
df.head(5)

## Save as Parquet

In [None]:
df.repartition(6).write.parquet("fhvtripdata/2019/10/", mode="overwrite")

### Read from Parquet

In [None]:
df = spark.read.parquet("fhvtripdata/2019/10/")

In [None]:
df.printSchema()