0. Install the dependency

In [None]:
!pip install google-cloud-bigtable==1.2.0

Collecting google-cloud-bigtable==1.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/35/23/88d98cef6845422ecbe3d16491acc9341cd3bd503fa6938376c5ad7f0c2b/google_cloud_bigtable-1.2.0-py2.py3-none-any.whl (234kB)
[K     |█▍                              | 10kB 16.2MB/s eta 0:00:01[K     |██▉                             | 20kB 17.8MB/s eta 0:00:01[K     |████▏                           | 30kB 15.6MB/s eta 0:00:01[K     |█████▋                          | 40kB 12.4MB/s eta 0:00:01[K     |███████                         | 51kB 8.8MB/s eta 0:00:01[K     |████████▍                       | 61kB 8.1MB/s eta 0:00:01[K     |█████████▊                      | 71kB 9.1MB/s eta 0:00:01[K     |███████████▏                    | 81kB 10.0MB/s eta 0:00:01[K     |████████████▌                   | 92kB 8.9MB/s eta 0:00:01[K     |██████████████                  | 102kB 8.4MB/s eta 0:00:01[K     |███████████████▍                | 112kB 8.4MB/s eta 0:00:01[K     |███████████

1. Import necessary modules

In [None]:
from google.cloud import bigquery, bigtable
from google.colab import auth
auth.authenticate_user()

# Bigquery

2. Creating and getting necessary information from Bigquery

In [30]:
bqc = bigquery.Client(project="lateral-array-281406")

table_schema = [
                bigquery.SchemaField(
                    "date",
                    field_type="TIMESTAMP"
                ),
                bigquery.SchemaField(
                    "covid_count",
                    field_type="RECORD",
                    fields = [
                              bigquery.SchemaField(
                                  "country",
                                  field_type="STRING"
                              ),
                              bigquery.SchemaField(
                                  "province",
                                  field_type="STRING"
                              ),
                              bigquery.SchemaField(
                                  name="region",
                                  field_type="STRING"
                              ),
                              bigquery.SchemaField(
                                  "confirmed_case",
                                  field_type="NUMERIC"
                              )
                    ]
                )
]
italy_covid_cases = bigquery.Table(
    bigquery.TableReference(
        bigquery.DatasetReference(
          "lateral-array-281406",
          "dev"  
        ),
        "italy_covid_cases"
    ),
    schema=table_schema
)

table_ref = None

try:
  table_ref = bqc.get_table(italy_covid_cases)
except Exception as ex:
  print(f"Table {italy_covid_cases.dataset_id}.{italy_covid_cases.table_id} not found. ")

if table_ref is None:
  print(f"Attempting to create new table.")

bqc.create_table(italy_covid_cases, exists_ok=True)
print("Table has been created")

Table dev.italy_covid_cases not found. 
Attempting to create new table.
Table has been created


3. Getting table details and data 

In [31]:
%%bigquery --project lateral-array-281406
select * from dev.italy_covid_cases

Unnamed: 0,date,covid_count


4. Populating dummy data

In [32]:
%%bigquery --project lateral-array-281406
insert into dev.italy_covid_cases
SELECT
  date,
  STRUCT(country,
   region_name,
    province_name,
    confirmed_cases ) as covid_count
FROM
  `bigquery-public-data.covid19_italy.data_by_province`

5. Verify the data

In [33]:
%%bigquery --project lateral-array-281406
select * from dev.italy_covid_cases limit 100

Unnamed: 0,date,covid_count
0,2020-03-26 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
1,2020-05-23 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
2,2020-06-17 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
3,2020-07-04 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
4,2020-09-07 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
...,...,...
95,2020-12-12 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
96,2021-01-06 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
97,2021-01-22 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."
98,2021-01-24 17:00:00+00:00,"{'country': 'ITA', 'province': 'Lazio', 'regio..."


<google.cloud.bigquery.table.RowIterator at 0x7fe1ff0db390>

# Bigtable

1. Create a bigtable instance

In [92]:
from google.cloud.bigtable import Client
from google.cloud.bigtable import enums

UNIQUE_SUFFIX = "poc"

my_instance_id = "myinstance-" + UNIQUE_SUFFIX
my_cluster_id = "mycluster-" + UNIQUE_SUFFIX
location_id = "us-central1-c"
storage_type = enums.StorageType.HDD
dev = enums.Instance.Type.DEVELOPMENT

client = Client(admin=True, project="lateral-array-281406")
instance = client.instance(my_instance_id, instance_type=dev)
cluster = instance.cluster(
    my_cluster_id,
    location_id=location_id,
    default_storage_type=storage_type,
)
operation = instance.create(clusters=[cluster])

# We want to make sure the operation completes.
operation.result(timeout=100)

name: "projects/lateral-array-281406/instances/myinstance-poc"
display_name: "myinstance-poc"
state: READY
type: DEVELOPMENT

2. Create table

In [94]:
instance.table("italy_covid_cases").create()

In [95]:
from google.cloud.bigtable import column_family

instance.table("italy_covid_cases").column_family(
"covid_cases", column_family.MaxVersionsGCRule(2)
).create()
italy_covid_cases = instance.table("italy_covid_cases")

# Bigquery to Bigtable

In [None]:
from datetime import datetime
rows = []
row_key_d = dict()
job = bqc.query("select * from dev.italy_covid_cases limit 10000")
for rec in job.result():
  row_key = f"{rec.covid_count['province']}#{rec.covid_count['country']}#{rec.covid_count['region']}#{rec.date.isoformat()}".encode('utf-8')
  row = italy_covid_cases.direct_row(row_key)
  row.set_cell(
      "covid_cases",
      "country".encode(),
      rec.covid_count['country'].encode('utf-8'),
      timestamp = datetime.utcnow()   
  )
  row.set_cell(
      "covid_cases",
      "province".encode(),
      rec.covid_count['province'].encode('utf-8'),
      timestamp = datetime.utcnow()   
  )
  row.set_cell(
      "covid_cases",
      "region".encode(),
      rec.covid_count['region'].encode('utf-8'),
      timestamp = datetime.utcnow()   
  )
  row.set_cell(
      "covid_cases",
      "confirmed_case".encode(),
      str(rec.covid_count['confirmed_case']).encode('utf-8'),
      timestamp = datetime.utcnow()   
  )
  rows.append(row)
  if row_key in row_key_d.keys():
    row_key_d[row_key] += 1
  else:
    row_key_d[row_key] =1

italy_covid_cases.mutate_rows(rows)


In [124]:
for row in italy_covid_cases.read_rows(limit=100):
  print(row.row_key, row.to_dict())

b'Lazio#2020-03-26T17:00:00+00:00' {b'covid_cases:confirmed_case': [<Cell value=b'1567' timestamp=2021-05-08 14:05:37.002000+00:00>], b'covid_cases:country': [<Cell value=b'ITA' timestamp=2021-05-08 14:05:37.002000+00:00>], b'covid_cases:province': [<Cell value=b'Lazio' timestamp=2021-05-08 14:05:37.002000+00:00>], b'covid_cases:region': [<Cell value=b'Roma' timestamp=2021-05-08 14:05:37.002000+00:00>]}
b'Lazio#2020-05-23T17:00:00+00:00' {b'covid_cases:confirmed_case': [<Cell value=b'5549' timestamp=2021-05-08 14:05:37.002000+00:00>], b'covid_cases:country': [<Cell value=b'ITA' timestamp=2021-05-08 14:05:37.002000+00:00>], b'covid_cases:province': [<Cell value=b'Lazio' timestamp=2021-05-08 14:05:37.002000+00:00>], b'covid_cases:region': [<Cell value=b'Roma' timestamp=2021-05-08 14:05:37.002000+00:00>]}
b'Lazio#2020-06-17T17:00:00+00:00' {b'covid_cases:confirmed_case': [<Cell value=b'5866' timestamp=2021-05-08 14:05:37.003000+00:00>], b'covid_cases:country': [<Cell value=b'ITA' timestam