In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('calendar migration') \
  .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/21 03:42:40 INFO SparkEnv: Registering MapOutputTracker
24/01/21 03:42:40 INFO SparkEnv: Registering BlockManagerMaster
24/01/21 03:42:40 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/01/21 03:42:40 INFO SparkEnv: Registering OutputCommitCoordinator


In [2]:
rawdata_bucket = 'airbnb-raw-data'   
prefix = 'japan/27-12-2023/'    # Should be parameterized

In [3]:
from google.cloud import storage

storage_client = storage.Client()
blobs = storage_client.list_blobs(rawdata_bucket,prefix=prefix)

for blob in blobs:
    if not blob.name.endswith('/'):
        print(blob.name)

japan/27-12-2023/calendar.csv
japan/27-12-2023/listings.csv
japan/27-12-2023/neighbourhoods.geojson
japan/27-12-2023/reviews.csv


In [4]:
cal_df = spark \
  .read \
  .option ( "header" , "true" ) \
  .csv ( f"gs://{rawdata_bucket}/japan/27-12-2023/calendar.csv" )

cal_df

                                                                                

listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
197677,2023-12-28,f,"$12,000.00",,3,1125
197677,2023-12-29,f,"$12,000.00",,3,1125
197677,2023-12-30,f,"$12,000.00",,3,1125
197677,2023-12-31,f,"$12,000.00",,3,1125
197677,2024-01-01,f,"$12,000.00",,3,1125
197677,2024-01-02,f,"$12,000.00",,3,1125
197677,2024-01-03,f,"$12,000.00",,3,1125
197677,2024-01-04,f,"$12,000.00",,3,1125
197677,2024-01-05,f,"$12,000.00",,3,1125
197677,2024-01-06,f,"$12,000.00",,3,1125


In [5]:
cal_df.printSchema()
print('No of paritions : ', cal_df.rdd.getNumPartitions() )
# print('No of rows in df : ', cal_df.count())
# print('No of null rows in df :',cal_df.na.drop().count())

root
 |-- listing_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- available: string (nullable = true)
 |-- price: string (nullable = true)
 |-- adjusted_price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- maximum_nights: string (nullable = true)

No of paritions :  2


In [6]:
import pyspark.sql.functions as F

# check if any cols has null values

def print_na_cols(df):
    for c in df.columns:
        print(f'For col {c}:',df.filter(F.col(c).isNull()).count())
        


In [7]:

cal_df.select(F.regexp_replace(cal_df.price, '[\$,]', ''))

                                                                                

"regexp_replace(price, [\$,], , 1)"
12000.0
12000.0
12000.0
12000.0
12000.0
12000.0
12000.0
12000.0
12000.0
12000.0


In [8]:
import re 
import pyspark.sql.types as T


cal_df = cal_df.withColumn('date',cal_df['date'].cast(T.DateType())).\
                withColumn('available',cal_df['available'].cast(T.BooleanType())).\
                withColumn('price',  F.regexp_replace(cal_df['price'], '[\$,]', '').cast(T.DecimalType(38,9)))
cal_df = cal_df.drop('adjusted_price')
cal_df.printSchema()


root
 |-- listing_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- available: boolean (nullable = true)
 |-- price: decimal(38,9) (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- maximum_nights: string (nullable = true)



In [9]:


cal_df = cal_df.withColumn('minimum_nights',cal_df['minimum_nights'].cast(T.LongType())).\
                withColumn('maximum_nights',cal_df['maximum_nights'].cast(T.LongType())).\
                withColumn('listing_id',cal_df['listing_id'].cast(T.LongType()))
cal_df.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- available: boolean (nullable = true)
 |-- price: decimal(38,9) (nullable = true)
 |-- minimum_nights: long (nullable = true)
 |-- maximum_nights: long (nullable = true)



In [16]:
    
def read_from_bq(dataset,table):
    return spark.read.format("bigquery").option("table","{}.{}".format(dataset, table)).load()

In [19]:
table = 'dim_calendar'
dataset = 'airbnb'
gcs_staging_bucket = 'airbnb-staging-data'
gcs_staging_data_path = 'gs://{}/{}/'.format(gcs_staging_bucket,prefix)
print(gcs_staging_data_path)
cal_df.coalesce(1).write \
  .mode('overwrite') \
  .parquet(gcs_staging_data_path)

#df = read_from_bq(dataset,table)
#df.printSchema()

gs://airbnb-staging-data/japan/27-12-2023//


                                                                                

24/01/21 03:56:27 WARN BigQueryDataSourceWriterInsertableRelation: It seems that 2 out of 0 partitions have failed, aborting
24/01/21 03:56:27 WARN BigQueryDirectDataSourceWriterContext: BigQuery Data Source writer dbd14af1-cada-4854-b6c9-ff921f090c16 aborted
