In [1]:
import pyspark
from pyspark.sql import types
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
pyspark.__file__

'/usr/local/spark/python/pyspark/__init__.py'

In [2]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "../spark/resources/jars/gcs-connector-hadoop3-latest.jar,../spark/resources/jars/spark-bigquery-latest_2.12.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/.google/credentials/google_credentials.json")

sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", "/.google/credentials/google_credentials.json")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")

22/04/04 07:49:25 WARN Utils: Your hostname, sparkdev resolves to a loopback address: 127.0.1.1; using 10.0.2.23 instead (on interface enp0s3)
22/04/04 07:49:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/04/04 07:49:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# spark = SparkSession.builder \
#   .master("local[*]") \
#   .appName('1.2. BigQuery Storage & Spark SQL - Python') \
#   .config('spark.jars', '/home/jovyan/work/jars/spark-bigquery-latest.jar') \
#   .getOrCreate()
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [4]:
df_artists = spark.read.option("header",True).parquet("gs://dtc_data_lake_applied-mystery-341809/raw/artists.parquet")

                                                                                

In [5]:
df_artists.printSchema()

root
 |-- id: string (nullable = true)
 |-- followers: double (nullable = true)
 |-- genres: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: long (nullable = true)



In [8]:
schema_artists = types.StructType([
	types.StructField('id',types.StringType(),True),
	types.StructField('followers',types.DoubleType(),True),
	types.StructField('name',types.StringType(),True)
])

In [9]:
df_artists = spark.read \
.option("header",True) \
.schema(schema_artists) \
.parquet("gs://dtc_data_lake_applied-mystery-341809/raw/artists.parquet")

In [10]:
df_artists = df_artists.repartition(8)

In [4]:
# df_artists = spark.read \
# .option("header",True) \
# .parquet("gs://dtc_data_lake_applied-mystery-341809/transform/artists")

                                                                                

In [5]:
# df_artists.show(10)

                                                                                

+--------------------+---------+--------------------+
|                  id|followers|                name|
+--------------------+---------+--------------------+
|59qz10hjQPAs0spos...|     18.0|    Boys of the Band|
|0LcrfJ63GdIW4n2UZ...|      0.0|David Mackersie e...|
|18lEjlk2JBPdyObT3...|    785.0|Ministerio Doble ...|
|1UhZGGHbPPHAHt2ce...|    214.0|  Andre Tschaskowski|
|5D95DPWHohrzVbdub...|  10557.0|       Advent Sorrow|
|11CbG4ImkEw99aUng...|   3614.0|                  MX|
|61MH29rMIyOfuK7KX...|  27878.0| The Vintage Caravan|
|5lInFfKjIAVzBIOg4...|    445.0|     Charles Grigsby|
|3kJakFcxRwLW9f47x...|      4.0|        Willy Rustad|
|508weSx4HBumrGggF...|   1110.0|              Hunxho|
+--------------------+---------+--------------------+
only showing top 10 rows



In [11]:
# Update to your GCS bucket
gcs_bucket = 'dtc_data_lake_applied-mystery-341809'

# Update to your BigQuery dataset name you created
bq_dataset = 'bq_project_dezoomcamp'

# Enter BigQuery table name you want to create or overwite. 
# If the table does not exist it will be created when you run the write function
bq_table = 'dim_artists'

df_artists.write \
  .format("bigquery") \
  .option("table","{}.{}".format(bq_dataset, bq_table)) \
  .option("temporaryGcsBucket", gcs_bucket) \
  .mode('overwrite') \
  .save()

                                                                                