Task 5: Write Import Statements

In [1]:
import apache_beam as beam
import csv
import os 
from typing import NamedTuple
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from google.cloud import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime

Task 6: Create a Pipeline Object with Options

In [2]:
pipeline_options = PipelineOptions(
    runner='DirectRunner',
    project='andrii-edu-bigquery',
    temp_location='gs://airbnb-educative-source-data/tmp/',
    staging_location='gs://airbnb-educative-source-data/staging',
    job_name='etl-pipeline',
    machine_type='e2-standard-2',
    flags=[],
    num_workers=1,
    max_num_workers=1,
    region='us-central1',
    save_main_session=False
)

In [3]:
mypipeline = beam.Pipeline(options=pipeline_options)

Task 7: Read Data from GCS Bucket in the Pipeline

In [4]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/usercode/etl-pipeline-key.json"

In [5]:
gcs_bucket = 'airbnb-educative-source-data'
csv_file_path = 'airbnb_data.csv'
gcs_path = f'gs://{gcs_bucket}/{csv_file_path}'

In [6]:
csv_data = (
    mypipeline
    | 'Read CSV' >> beam.io.ReadFromText(gcs_path, skip_header_lines=1)
)



Task 8: Parse the Input Data

In [7]:
class AirbnbData(NamedTuple):
    id: str
    name: str
    host_id: str
    host_name: str
    neighbourhood_group: str
    neighbourhood: str
    latitude: str
    longitude: str
    room_type: str
    price: str
    minimum_nights: str
    number_of_reviews: str
    last_review: str
    reviews_per_month: str
    calculated_host_listings_count: str
    availability_365: str
    number_of_reviews_ltm: str
    license: str
    price_type: str = ''
    neighborhood_avg_price: float = 0
    price_usd: float = 0
    lr_year: str = ''
    lr_month: str = ''
    lr_day: str = ''

In [8]:
parsed_data = (
    csv_data
    | 'Parse CSV' >> beam.Map(lambda row: AirbnbData(*next(csv.reader([row]))))
)

Task 9: Define Data Validation Functions in the Pipeline

In [9]:
filtered_data = (
    parsed_data
    | 'Filter positive prices' >> beam.Filter(lambda row: int(row.price) >= 0)
    | 'Filter abnormal coordinates' >> beam.Filter(
        lambda row: (
            -90.0 <= float(row.latitude) <= 90.0 and
            -180.0 <= float(row.longitude) <= 180.0
        )
    )
    | 'Filter non-null host IDs' >> beam.Filter(lambda row: row.host_id.strip() != '')
)

Task 10: Define Data Cleaning Functions in the Pipeline

In [10]:
deduplicated_data = (
    filtered_data
    | 'Remove duplicates' >> beam.Distinct()
)

In [11]:
special_char_replaced = (
    deduplicated_data
    | 'Replace comma' >> beam.Map(lambda row: row._replace(**{'name': row.name.replace(',', '_')}))
)

In [12]:
def impute_values(row):
    return row._replace(
        neighbourhood='not available' if row.neighbourhood in ('', None) else row.neighbourhood,
        availability_365='0' if row.availability_365 in ('', None) else row.availability_365,
        reviews_per_month='0' if row.reviews_per_month in ('', None) else row.reviews_per_month,
        number_of_reviews_ltm='0' if row.number_of_reviews_ltm in ('', None) else row.number_of_reviews_ltm
    )

class ImputeValues(beam.DoFn):
    def process(self, element):
        return [impute_values(element)]

In [13]:
cleaned_data = (
    special_char_replaced
    | 'Impute values' >> beam.ParDo(ImputeValues())
)

Task 11: Define Data Enrichment Functions in the Pipeline

In [14]:
enriched_data = (
    cleaned_data
    | 'Classify PriceType' >> beam.Map(
        lambda row: row._replace(price_type='economic' if int(row.price) < 50 else ('midrange' if 50 <= int(row.price) < 150 else 'luxury'))
    )
    | 'Convert to USD' >> beam.Map(
        lambda row: row._replace(price_usd=float(row.price) * 0.66 )
    )
    | 'Extract date components' >> beam.Map(
        lambda row: row._replace(
            lr_year='',
            lr_month='',
            lr_day=''
        ) if row.last_review in ('', None) else row._replace(
            lr_year=int(datetime.strptime(row.last_review, '%Y-%m-%d').year),
            lr_month=int(datetime.strptime(row.last_review, '%Y-%m-%d').month),
            lr_day=int(datetime.strptime(row.last_review, '%Y-%m-%d').day)
        )
    )
)

Task 12: Perform Aggregation Operations

In [15]:
avg_price_per_neighborhood = (
    enriched_data
    | 'KeyBy Neighborhood' >> beam.Map(lambda row: (row.neighbourhood, float(row.price)))
    | 'GroupBy Neighborhood' >> beam.GroupByKey()
    | 'Compute avg price' >> beam.Map(lambda kv: (kv[0], sum(kv[1]) / len(kv[1])))
)

In [16]:
enriched_data_with_avg_price = (
    enriched_data
    | 'Add NeighborhoodAvgPrice' >> beam.Map(
        lambda row, avg_price_dict: row._replace(
            neighborhood_avg_price=round(avg_price_dict.get(row.neighbourhood, 0), 2)
        ),
        avg_price_dict=beam.pvalue.AsDict(avg_price_per_neighborhood)
    )
)

Task 13: Create BigQuery Dataset

In [17]:
client = bigquery.Client()

In [18]:
dataset_id = "andrii-edu-bigquery.airbnb_ds"

In [19]:
dataset = bigquery.Dataset(dataset_id)

In [20]:
dataset.location = "US"

In [21]:
dataset = client.create_dataset(dataset, exists_ok=True) 

Task 14: Upload Final Data to BigQuery Table

In [22]:
output_table_spec = 'andrii-edu-bigquery:airbnb_ds.airbnb_tb'
gcs_temp_location = 'gs://airbnb-educative-source-data/bigquery_temp/'

In [23]:
(enriched_data_with_avg_price
    | 'Convert to Dict' >> beam.Map(lambda row: row._asdict())
    | 'Write to BigQuery' >> WriteToBigQuery(
        output_table_spec,
        schema='SCHEMA_AUTODETECT',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        custom_gcs_temp_location=gcs_temp_location)
)

<apache_beam.io.gcp.bigquery.WriteResult at 0x73521a630df0>

Task 15: Execute the Pipeline

In [24]:
result = mypipeline.run()
result.wait_until_finish()



'DONE'

In [25]:
state = result.state
print(state)

DONE


Task 16: Create Views

In [26]:
view_id = "andrii-edu-bigquery.airbnb_ds.airbnb_analysis_vw"

In [27]:
view_ref = bigquery.Table(view_id)

In [28]:
view_ref.view_query = """
SELECT
  neighbourhood,
  price_type,
  ROUND(AVG(price), 2) AS avg_price,
  ROUND(MIN(price), 2) AS min_price,
  ROUND(MAX(price), 2) AS max_price,
  MIN(neighborhood_avg_price) AS min_neighbourhood_avg_price,
  MAX(neighborhood_avg_price) AS max_neighbourhood_avg_price
FROM airbnb_ds.airbnb_tb
GROUP BY neighbourhood, price_type 
ORDER BY neighbourhood, price_type;
"""

In [29]:
view = client.create_table(view_ref , exists_ok=True)

Task 17: Validate the Results

In [30]:
!bq query --use_legacy_sql=false "SELECT count(*) FROM airbnb_ds.airbnb_tb"

[1m[31mBQ CLI will soon require all users to log in using `gcloud auth login`. `bq
init` will no longer handle authentication after January 1, 2024.[0m

Welcome to BigQuery! This script will walk you through the 
process of initializing your .bigqueryrc configuration file.

First, we need to set up your credentials if they do not 
already exist.

Setting project_id andrii-edu-bigquery as the default.

BigQuery configuration complete! Type "bq" to get started.

+-------+
|  f0_  |
+-------+
| 23171 |
+-------+


In [31]:
!bq query --use_legacy_sql=false "SELECT * FROM airbnb_ds.airbnb_analysis_vw LIMIT 10"

+---------------+------------+-----------+-----------+-----------+-----------------------------+-----------------------------+
| neighbourhood | price_type | avg_price | min_price | max_price | min_neighbourhood_avg_price | max_neighbourhood_avg_price |
+---------------+------------+-----------+-----------+-----------+-----------------------------+-----------------------------+
| Banyule       | economic   |     39.06 |      15.0 |      49.0 |                      158.01 |                      158.01 |
| Banyule       | luxury     |     270.9 |     150.0 |    1200.0 |                      158.01 |                      158.01 |
| Banyule       | midrange   |     95.36 |      50.0 |     149.0 |                      158.01 |                      158.01 |
| Bayside       | economic   |     34.33 |      25.0 |      40.0 |                      380.25 |                      380.25 |
| Bayside       | luxury     |    515.66 |     150.0 |   11429.0 |                      380.25 |               

Task 20: Validating the Pipeline Results