In [2]:
import os
from dotenv import load_dotenv
import pandas as pd
from google.cloud import bigquery
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
import gcsfs 
from typing import Union, List





In [16]:
# Load environment variables from .env
load_dotenv()

GCS_RAW_PREFIX = os.getenv("GCS_RAW_PREFIX")
GCS_STAGING_PATH = os.getenv("GCS_STAGING_PATH")
BQ_TABLE_ID = os.getenv("BQ_TABLE_ID")
GOOGLE_CLOUD_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")

In [17]:
def read_path(gcs_path:str) -> pd.DataFrame: 
    fs = gcsfs.GCSFileSystem(project=GOOGLE_CLOUD_PROJECT)
    files = fs.glob(f"{gcs_path}/*.csv")
    return files

In [18]:
PATHS = read_path(GCS_RAW_PREFIX)

In [19]:
PATHS

['etl-portfolio-raw/2024_Q4_airbnb_berlin.csv',
 'etl-portfolio-raw/2025_Q1_airbnb_berlin.csv',
 'etl-portfolio-raw/2025_Q2_airbnb_berlin.csv',
 'etl-portfolio-raw/2025_Q3_airbnb_berlin.csv']

In [20]:
def read_data(gcs_path: Union[str, List[str]]) -> pd.DataFrame:

    fs = gcsfs.GCSFileSystem(project=GOOGLE_CLOUD_PROJECT)
    
    if isinstance(gcs_path, str):
        paths = [gcs_path]
    else:
        paths = gcs_path

    # Read all CSVs and concatenate
    df_list = []
    for path in paths:
        with fs.open(path, "rb") as f:
            df_list.append(pd.read_csv(f))
    
    return df_list


In [21]:
data = read_data(PATHS)

In [27]:
data[2>]

SyntaxError: invalid syntax (2323925041.py, line 1)

In [13]:
data

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,number_of_reviews_ltm,license
0,3176,Fabulous Flat in great Location,3718,Britta,Pankow,Prenzlauer Berg Südwest,52.534710,13.418100,Entire home/apt,95.0,63,148,2023-05-25,0.78,1,293,0,First name and Last name: Nicolas Krotz <br/> ...
1,9991,Geourgeous flat - outstanding views,33852,Philipp,Pankow,Prenzlauer Berg Südwest,52.532690,13.418050,Entire home/apt,180.0,6,7,2020-01-04,0.06,1,43,0,03/Z/RA/003410-18
2,14325,Studio Apartment in Prenzlauer Berg,55531,Chris + Oliver,Pankow,Prenzlauer Berg Nordwest,52.548130,13.403660,Entire home/apt,75.0,150,26,2023-11-30,0.15,4,91,0,
3,16644,In the Heart of Berlin - Kreuzberg,64696,Rene,Friedrichshain-Kreuzberg,nördliche Luisenstadt,52.503120,13.435080,Entire home/apt,90.0,93,48,2017-12-14,0.27,2,111,0,
4,17904,Beautiful Kreuzberg studio - 3 months minimum,68997,Matthias,Neukölln,Reuterstraße,52.494190,13.421660,Entire home/apt,28.0,92,299,2022-12-01,1.65,1,29,0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13979,1314431750555958855,Trung tâm quận Wedding,667407129,Lisa,Mitte,Wedding Zentrum,52.550847,13.361652,Entire home/apt,80.0,1,0,,,2,352,0,
13980,1314778665911489677,Wunderschönes Zimmer im Zentrum,650206258,Gihad,Mitte,Regierungsviertel,52.515988,13.380331,Private room,760.0,1,0,,,2,365,0,
13981,1315256184055453744,Modernes Apartment im Neuköllner Kiez,119577461,Mikel,Neukölln,Neuköllner Mitte/Zentrum,52.464220,13.437610,Entire home/apt,119.0,1,0,,,11,89,0,02/Z/ZA/746854-32
13982,1315368682473897294,1 Room Loft Studio Apartment,46882657,Teresa,Friedrichshain-Kreuzberg,Frankfurter Allee Süd FK,52.514735,13.459341,Entire home/apt,52.0,92,0,,,1,365,0,


In [12]:
data.isnull().sum()

id                                   0
name                                 0
host_id                              0
host_name                            9
neighbourhood_group                  0
neighbourhood                        0
latitude                             0
longitude                            0
room_type                            0
price                             4994
minimum_nights                       0
number_of_reviews                    0
last_review                       3278
reviews_per_month                 3278
calculated_host_listings_count       0
availability_365                     0
number_of_reviews_ltm                0
license                           5157
dtype: int64

In [10]:
# show ANY row containing the ID digits
data[data.astype(str).apply(lambda row: row.str.contains("Berlins finest", na=False)).any(axis=1)]



Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,number_of_reviews_ltm,license
8502,642320854539602604,Luxury High End Apartment!\nBerlins finest,462618728,Bora,Mitte,Regierungsviertel,52.51128,13.37993,Entire home/apt,499.0,92,0,,,1,358,0,


In [39]:
from dataclasses import dataclass, fields
from typing import Optional

@dataclass
class Person:
    id: Optional[int]
    name: str
    age: int

In [40]:
field_types = {f.name: f.type for f in fields(Person)}

In [42]:
field_types

{'id': typing.Optional[int], 'name': str, 'age': int}

In [46]:
typ = str

In [47]:
typ("11")

'11'

In [7]:
import os
import json

def load_schema():
    # Use __file__ if available, otherwise fall back for Jupyter
    base_path = os.path.dirname(__file__) if "__file__" in globals() else os.getcwd()

    schema_path = os.path.join(
        base_path,
        "..",    # go up from dataflow_job → airbnb_etl
        "..",    # go up from airbnb_etl → project root
        "infra",
        "beam",
        "schemas",
        "stg_airbnb_listings.json"
    )

    schema_path = os.path.abspath(schema_path)  # normalize

    print(f"Loading schema from: {schema_path}")

    with open(schema_path, "r") as f:
        return json.load(f)


In [8]:
load_schema()

Loading schema from: /Users/fstankat/Projects/airbnb-etl-project/airbnb-etl-project/infra/beam/schemas/stg_airbnb_listings.json


[{'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
 {'name': 'name', 'type': 'STRING', 'mode': 'REQUIRED'},
 {'name': 'host_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
 {'name': 'host_name', 'type': 'STRING', 'mode': 'REQUIRED'},
 {'name': 'neighbourhood_group', 'type': 'STRING', 'mode': 'REQUIRED'},
 {'name': 'neighbourhood', 'type': 'STRING', 'mode': 'REQUIRED'},
 {'name': 'latitude', 'type': 'FLOAT', 'mode': 'REQUIRED'},
 {'name': 'longitude', 'type': 'FLOAT', 'mode': 'REQUIRED'},
 {'name': 'room_type', 'type': 'STRING', 'mode': 'REQUIRED'},
 {'name': 'price', 'type': 'FLOAT', 'mode': 'REQUIRED'},
 {'name': 'minimum_nights', 'type': 'INTEGER', 'mode': 'REQUIRED'},
 {'name': 'number_of_reviews', 'type': 'INTEGER', 'mode': 'REQUIRED'},
 {'name': 'last_review', 'type': 'DATE', 'mode': 'REQUIRED'},
 {'name': 'reviews_per_month', 'type': 'FLOAT', 'mode': 'REQUIRED'},
 {'name': 'calculated_host_listings_count',
  'type': 'INTEGER',
  'mode': 'REQUIRED'},
 {'name': 'availability_365'