This example is based on the [DVD Rental](https://www.postgresqltutorial.com/postgresql-sample-database/) database.

You can found a basic pandas tutorial [here](https://github.com/datascience-unbosque/pandas-tutorial).

In [None]:
!pip install pyarrow
!pip install google-cloud-bigquery

In [None]:
import psycopg2
import pandas as pd

from google.cloud import bigquery
from google.oauth2 import service_account

### Extracting data from PostgreSQL

In [None]:
conn = psycopg2.connect(host = "localhost", port = 5432, database = "dvdrental", user = "postgres")

In [None]:
cur = conn.cursor()
rentals = pd.read_sql_query("SELECT * FROM public.rental", conn)
cur.close()

In [None]:
cur = conn.cursor()
rentals_by_movie = pd.read_sql_query("""
    SELECT f.title AS film, COUNT(r.rental_id) AS num_rentals
    FROM public.film f 
    RIGHT JOIN public.inventory i 
        ON f.film_id = i.film_id
    RIGHT JOIN public.rental r
        ON i.inventory_id = r.inventory_id
    GROUP BY f.title
    ORDER BY num_rentals DESC
""", conn)
cur.close()

In [None]:
conn.close()

In [None]:
rentals.shape

In [None]:
rentals.dtypes

In [None]:
rentals.head()

In [None]:
rentals_by_movie.shape

In [None]:
rentals_by_movie.dtypes

In [None]:
rentals_by_movie.head()

### Transforming data

In [None]:
rentals_by_month = rentals.groupby([rentals["rental_date"].dt.year.rename("year"), rentals["rental_date"].dt.month.rename("month")])\
    .agg({"rental_id": "count"}).reset_index()\
    .rename(columns = {"rental_id": "num_rentals"})\
    .sort_values(by = ["year", "month"])

In [None]:
rentals_by_month

### Loading data to BigQuery

In [None]:
credentials = service_account.Credentials.from_service_account_file("../../unbosque-service-account.json", scopes = ["https://www.googleapis.com/auth/cloud-platform"])

In [None]:
client = bigquery.Client(credentials = credentials, project = credentials.project_id)

In [None]:
# Creating the job config
job_config = bigquery.LoadJobConfig(
    schema = [
        # Supported datatypes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
        bigquery.SchemaField("year", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("month", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("num_rentals", bigquery.enums.SqlTypeNames.INT64)
    ],
    # Drod and re-create table, if exist
    write_disposition = "WRITE_TRUNCATE",
)

In [None]:
# Sending the job to BigQuery
job = client.load_table_from_dataframe(
    rentals_by_month, "etl_tutorial.rentals_by_month", job_config = job_config
)
job.result()

In [None]:
# Verifying if table was successfully created or updated
table = client.get_table("etl_tutorial.rentals_by_month")
print("Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), "etl_tutorial.rentals_by_month"
    )
)

In [None]:
# Creating the job config
job_config = bigquery.LoadJobConfig(
    schema = [
        # Supported datatypes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
        bigquery.SchemaField("film", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("num_rentals", bigquery.enums.SqlTypeNames.INT64)
    ],
    # Drod and re-create table, if exist
    write_disposition = "WRITE_TRUNCATE",
)

In [None]:
# Sending the job to BigQuery
job = client.load_table_from_dataframe(
    rentals_by_movie, "etl_tutorial.rentals_by_movie", job_config = job_config
)
job.result()

In [None]:
# Verifying if table was successfully created or updated
table = client.get_table("etl_tutorial.rentals_by_movie")
print("Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), "etl_tutorial.rentals_by_movie"
    )
)