In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/home/viktorija/.gc/prices.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/03/23 10:38:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [82]:
import pandas as pd

In [83]:
from pyspark.sql import types

In [84]:
df_schema = types.StructType([
    types.StructField("id", types.IntegerType(), True),
    types.StructField("name", types.StringType(), True),
    types.StructField("host_id", types.IntegerType(), True),
    types.StructField("host_name", types.StringType(), True),
    types.StructField("neighbourhood_group", types.StringType(), True),
    types.StructField("neighbourhood", types.StringType(), True),
    types.StructField("latitude", types.StringType(), True),
    types.StructField("longitude", types.StringType(), True),
    types.StructField("room_type", types.StringType(), True),
    types.StructField("price", types.DoubleType(), True),
    types.StructField("minimum_nights", types.IntegerType(), True),
    types.StructField("number_of_reviews", types.IntegerType(), True),
    types.StructField("last_review",types.DateType(), True),  
    types.StructField("reviews_per_month", types.IntegerType(), True),
    types.StructField("calculated_host_listings_count", types.IntegerType(), True),
    types.StructField("availability_365", types.IntegerType(), True),
    types.StructField("number_of_reviews_ltm", types.IntegerType(), True),
    types.StructField("license", types.StringType(), True),
    types.StructField("country", types.StringType(), True),
    types.StructField("region", types.StringType(), True),
    types.StructField("city", types.StringType(), True),
    types.StructField("release_date", types.DateType(), True) 
])


In [85]:
from pyspark.sql import functions as F
from pyspark.sql.functions import md5, concat_ws

In [86]:
df_spain = spark.read\
    .option("header", "true") \
    .schema(df_schema) \
    .csv('gs://airbnb-prices-eu-bucket/Spain/*')

In [87]:
df_spain.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- release_date: date (nullable = true)



In [88]:
 df_spain_uni = df_spain.withColumn("unique_row_id", md5(concat_ws("|", "country", "region", "city", "release_date", "id", "host_id")))

In [89]:
df_spain_uni.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- unique_row_id: string (nullable = f

In [90]:
df_spain_uni.head(2)

[Row(id=21853, name='Bright and airy room', host_id=83531, host_name='Abdel', neighbourhood_group='Latina', neighbourhood='Cármenes', latitude='40.40381', longitude='-3.7413', room_type='Private room', price=29.0, minimum_nights=4, number_of_reviews=33, last_review=datetime.date(2018, 7, 15), reviews_per_month=None, calculated_host_listings_count=2, availability_365=233, number_of_reviews_ltm=0, license=None, country='spain', region='comunidad-de-madrid', city='madrid', release_date=datetime.date(2025, 3, 5), unique_row_id='180dd48d4d8c14fa93c6f14e0025e226'),
 Row(id=30320, name='Great Vacational Apartments', host_id=130907, host_name='Dana', neighbourhood_group='Centro', neighbourhood='Sol', latitude='40.41476', longitude='-3.70418', room_type='Entire home/apt', price=None, minimum_nights=5, number_of_reviews=172, last_review=datetime.date(2022, 9, 26), reviews_per_month=None, calculated_host_listings_count=3, availability_365=0, number_of_reviews_ltm=0, license=None, country='spain',

In [91]:
df_spain_uni.createOrReplaceTempView('spain_listings_temp')

In [92]:
df_spain_bq = spark.sql("""
SELECT  *    
FROM
    spain_listings_temp
""")

In [94]:
df_spain_bq.write.format('bigquery') \
    .option('table', 'airbnb_prices_eu_dataset.spain_temp') \
    .option('temporaryGcsBucket', 'dataproc-temp-europe-west1-95215738418-sjnvgog3') \
    .save()

Py4JJavaError: An error occurred while calling o668.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: bigquery. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more
