# ETL tutorial

In [None]:
!pip install --upgrade google-cloud-bigquery

In [None]:
!pip install pandas-gbq

In [None]:
!pip install db-dtypes

In [None]:
import pymongo

import pandas as pd
import matplotlib.pyplot as plt

from google.cloud import bigquery
from google.oauth2 import service_account

### Extracting data

In [None]:
DB_NAME = "sample_airbnb"
COLLECTION = "listingsAndReviews"

In [None]:
client = pymongo.MongoClient("mongodb+srv://fabiancpl:<password>@cluster0.ys2ybhm.mongodb.net/?retryWrites=true&w=majority", server_api = pymongo.server_api.ServerApi('1'))

In [None]:
db = client[DB_NAME]

In [None]:
col = db[COLLECTION]

In [None]:
result = col.find({}, {"_id": 0, "address.country": 1, "address.market": 1, "price": 1})

In [None]:
df = pd.json_normalize(result).rename(columns = {"address.country": "country", "address.market": "market"})

In [None]:
df.shape

In [None]:
df.dtypes

In [None]:
df.sample(5)

### Exploring and cleaning data

In [None]:
df["country"].value_counts(dropna = False)

In [None]:
df.loc[df["market"] == "", "market"] = "Other"

In [None]:
df["market"].value_counts(dropna = False)

In [None]:
df["price"] = df["price"].astype(str).astype(float)

In [None]:
plt.figure(figsize = (20, 3))
plt.boxplot(df["price"], vert = False)
plt.show()

### Transforming data

In [None]:
df["count"] = 1

In [None]:
df_grouped = df.groupby(["country", "market"]).agg({"count": "count", "price": "median"}).reset_index().rename(columns = {"price": "price_median"})

In [None]:
df_grouped.sample(5)

### Loading data

In [None]:
credentials = service_account.Credentials.from_service_account_file("./javeriana-dataprep.json", scopes = ["https://www.googleapis.com/auth/cloud-platform"])

In [None]:
client = bigquery.Client(credentials = credentials, project = credentials.project_id)

In [None]:
# Creating the job config
job_config = bigquery.LoadJobConfig(
    schema = [
        # Supported datatypes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
        bigquery.SchemaField("country", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("market", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("count", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("price_median", bigquery.enums.SqlTypeNames.FLOAT64)
    ],
    # Drod and re-create table, if exist
    write_disposition = "WRITE_TRUNCATE",
)

In [None]:
BQ_TABLE_NAME = "dataprep.listings_country_market"

In [None]:
# Sending the job to BigQuery
job = client.load_table_from_dataframe(
    df_grouped, BQ_TABLE_NAME, job_config = job_config
)

job.result()

In [None]:
# Verifying if table was successfully created or updated
table = client.get_table(BQ_TABLE_NAME)

print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), BQ_TABLE_NAME))

In [None]:
query = """SELECT * FROM `javeriana-dataprep.dataprep.listings_country_market`"""

pd.read_gbq(query, credentials = credentials)