<a id='theTop'></a>
## The Great Migration
This project aims to understand the journey of a data engineer who wants to build their "Hello, World!" on Astra.

#### The Data Engineer's "Hello, World!": 
Somewhere in the world 🌎, a data engineer coming to Astra will want to perform The Great Migration 🐘: moving their data from one cloud to Astra. For this example, we will see what this could be like for the dev moving data from GCP to Astra.

This notebook has the following 6 😱 Sections:
0. [Set-Up Stuff](#step0)

Then...
1. [Connect to GCP Big Query](#step1)
2. [ETL BQ Schema to Astra Payload](#step2)
3. [Create a table in Astra](#step3)
4. [ETL BQ Data to Astra Payloads](#step4)
5. [Insert the data in Astra](#step5)

Confirmation Step: Successfully query the data from Studio

<a id='step0'></a>
### 0. Set-Up
1. Python packages
2. Google Cloud SDK
3. Astra Credentials
4. [bonus] Connecting to BQ from Jupyter Notebooks

###### [back to the top](#theTop)

In [1]:
# Packages for Big Query
from google.cloud import bigquery # the google cloud SDK

# Packages for connecting to Astra
import os # for accessing creds
import requests # for making REST API requests
import uuid # to create UUIDs for Astra connections
import json # for converting json payloads to strings

# Packages for ETL
from copy import deepcopy # for schema ETL
import unittest # for writing tests as we go
import logging # for integrating into production codebase (instead of print())

# Packages for notebook stuff
from pprint import pprint # for pretty formatting
import warnings # for supressing warnings in a notebook

warnings.filterwarnings('ignore')
logger = logging.getLogger()

0.2: Follow [these docs from Google](https://cloud.google.com/sdk/docs/quickstarts) on setting up your Cloud SDK.

In [2]:
# 0.3 Astra Credentials
UUID = str(uuid.uuid1())
USER = os.environ["ASTRA_USR"]      # NEVER store your creds directly in your code!
PASSWORD = os.environ["ASTRA_PSWD"] # NEVER store your creds directly in your code!
DB_ID = os.environ["ASTRA_DB"]      # NEVER store your creds directly in your code!
REGION = "us-east1"
KYSPC = "ds_metrics"

0.4 [bonus]: The Data Engineering team created [this video](https://datastax.zoom.us/rec/play/usAsdOH5pzk3TNGdtQSDV_F7W9XsK6-s1iQdrKcJyhvgVnkAYVfzZuNGMeN95iQWA_vuaqDra-Rd4eTq?continueMode=true&_x_zm_rtaid=arprVtbjSLKluh76R95SbQ.1594042537833.ccce8a5266f8f5fa88958ab4c2812e6d&_x_zm_rhtaid=167) on how to use Jupyter notebooks to connect to Big Query. 

<a id='step1'></a>
### 1. Connect to GCP Big Query

###### [back to the top](#theTop)

In [3]:
# Notebook Settings
logger.setLevel(logging.DEBUG)

In [4]:
client = bigquery.Client()
PROJECT = "aqueduct-production"
DATASET = "ds_metrics"
TABLE = "dse_new_customers_daily"

In [5]:
# getting schema
def get_bq_schema(project, dataset, table):
    """
        Return the schema for your BQ project as a df
    """
    
    query = f'SELECT * FROM {project}.{dataset}.INFORMATION_SCHEMA.COLUMNS'
    df = client.query(query).to_dataframe()
    
    ## get the schema for only the table we want
    bq_schema = df[df["table_name"] == table]
    bq_schema
    return bq_schema

In [6]:
bq_schema = get_bq_schema(PROJECT, DATASET, TABLE)
bq_schema

Unnamed: 0,table_catalog,table_schema,table_name,column_name,ordinal_position,is_nullable,data_type,is_generated,generation_expression,is_stored,is_hidden,is_updatable,is_system_defined,is_partitioning_column,clustering_ordinal_position
112,aqueduct-production,ds_metrics,dse_new_customers_daily,product,1,YES,STRING,NEVER,,,NO,,NO,NO,
113,aqueduct-production,ds_metrics,dse_new_customers_daily,download_date,2,YES,DATE,NEVER,,,NO,,NO,NO,
114,aqueduct-production,ds_metrics,dse_new_customers_daily,count,3,YES,INT64,NEVER,,,NO,,NO,NO,


<a id='step2'></a>
### 2. ETL BQ Schema to Astra Payload

###### [back to the top](#theTop)

#### Dict 1: Mapping Data Types from GCP to Cassandra

In [7]:
bq_schema["data_type"].unique()

array(['STRING', 'DATE', 'INT64'], dtype=object)

In [8]:
schema_bq_to_c = {"STRING": "text",
                  "DATE": "date",
                  "INT64": "bigint"
                 }

#### Dict 2: Table Definition Payload

In [9]:
# create a dict of the required table structure for the schema
table_definition = {"name": "",
                    "ifNotExists": True,
                    "columnDefinitions": [],
                    "primaryKey": {"partitionKey": [],
                                   "clusteringKey": []},
                    "tableOptions": {"defaultTimeToLive":0}
                   }

##### ETL BQ Table Schema to Astra Payload

In [10]:
def create_table_schema(table_name):
    """
        Take a row from the BQ schema dataframe
        Create the table definition, with its partition key
        The partition key will be the table name (yes, this is bad)
    """
    # create new table_definition
    table_schema = {}
    table_schema = deepcopy(table_definition)
    table_schema["name"] = table_name
    
    # set the table name as the partition key
    partition_key = {}
    partition_key = deepcopy(col_def)
    partition_key["name"] = table_name
    partition_key["pos"] = 0
    partition_key["typeDefinition"] = "text"
    
    # add a column to the table def for the part key
    table_schema["columnDefinitions"].append(partition_key)
    
    # set the table's part key to the table name
    table_schema["primaryKey"]["partitionKey"].append(table_name)
    return table_schema

In [11]:
def create_clustering_keys(table_name, astra_schema):
    """
        Set the clustering columns for Astra Schema
        According to their ordinal position of the schema from BQ
    """
    for key, val in astra_schema.items():
        ### set clustering columns
        clusteringKey = []

        # sort clustering keys based on ordinal pos
        newlist = sorted(val["columnDefinitions"], key=lambda k: k['pos']) 

        for col in newlist:
            if col["pos"] > 0: # (We don't want to repeat the partition key!)
                clusteringKey.append(col["name"])

        val["primaryKey"]["clusteringKey"] = clusteringKey
        for col in val["columnDefinitions"]:
            col.pop("pos", None)
    return astra_schema

#### Dict 3: Column Definition Payload

In [12]:
# create a dict of the required column structure for the schema
col_def = {"name":"",
           "typeDefinition":"",
           "static": False,
           "pos": ""}

##### ETL BQ Column Schema to Astra Payload

In [13]:
def create_col_schema(row):
    """
        Take a row from the BQ schema dataframe
        Create the table definition, with its partition key
        The partition key will be the table name (yes, this is bad)
    """
    new_col = {}
    new_col = deepcopy(col_def)
    new_col["name"] = row["column_name"]
    new_col["typeDefinition"] = schema_bq_to_c[row["data_type"]]
    new_col["pos"] = row["ordinal_position"]  # we will use this later
    return new_col

#### Putting It All Together:

In [15]:
def transform_bq_schema(table_name, bq_schema):
    """
        Create a table definition dictionary
        Set up the partition key
        For all rows in the bq_schema dataframe
            Create column definitions
    """
    astra_schema = {}
    astra_schema[table_name] = create_table_schema(table_name)
    for index, row in bq_schema.iterrows():
        # create new column definition for all other columns
        new_col = create_col_schema(row)
        # append to astra_tables[table_name][columnDefinitions]
        astra_schema[table_name]["columnDefinitions"].append(new_col)
    astra_schema = create_clustering_keys(table_name, astra_schema)
    return astra_schema

In [17]:
astra_schema = transform_bq_schema(TABLE, bq_schema)
logging.debug(pprint(astra_schema["dse_new_customers_daily"]))

DEBUG:root:None


{'columnDefinitions': [{'name': 'dse_new_customers_daily',
                        'static': False,
                        'typeDefinition': 'text'},
                       {'name': 'product',
                        'static': False,
                        'typeDefinition': 'text'},
                       {'name': 'download_date',
                        'static': False,
                        'typeDefinition': 'date'},
                       {'name': 'count',
                        'static': False,
                        'typeDefinition': 'bigint'}],
 'ifNotExists': True,
 'name': 'dse_new_customers_daily',
 'primaryKey': {'clusteringKey': ['product', 'download_date', 'count'],
                'partitionKey': ['dse_new_customers_daily']},
 'tableOptions': {'defaultTimeToLive': 0}}


##### Bonus Stuff: Write tests as you go!

In [18]:
## two tests to make sure we transformed BQ Schema to Astra Schema correctly
class TestListElements(unittest.TestCase):
    def setUp(self):
        self.astra = list(astra_schema.keys())
        self.bq = list(bq_schema["table_name"].unique())

    def test_count_eq(self):
        self.assertCountEqual(self.astra, self.bq)

    def test_list_eq(self):
        self.assertListEqual(self.astra, self.bq)

unittest.main(argv=[''], verbosity=0, exit=False)


----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK


<unittest.main.TestProgram at 0x109b23da0>

<a id='step3'></a>
### 3. Create Astra Table
[Docs for payload structure are here](https://astra.readme.io/docs/creating-a-table-in-your-keyspace)


###### [back to the top](#theTop)

In [19]:
def get_token():
    """
        This helper function uses the auth REST API to get an access token
        returns: an auth token; 30 minute expiration
    """
    url = f"https://{DB_ID}-{REGION}.apps.astra.datastax.com/api/rest/v1/auth"
    payload = {"username":USER,
               "password":PASSWORD}
    headers = {'accept': '*/*',
               'content-type': 'application/json',
               'x-cassandra-request-id': UUID}
    # make auth request to Astra
    r = requests.post(url, 
                     data=json.dumps(payload), 
                     headers=headers)
    # raise any authentication errror
    if r.status_code != 201:
        raise Exception(r.text)   
    # extract and return the auth token 
    data = json.loads(r.text)
    return data["authToken"]

In [20]:
def create_table(table_def):
    """
        Post the table_def dictionary to the Astra REST API
        Return the response for downstream logging
    """
    url = f'https://{DB_ID}-{REGION}.apps.astra.datastax.com/api/rest/v1/keyspaces/{KYSPC}/tables'
    headers = {'accept': '*/*',
               'content-type': 'application/json',
               'x-cassandra-request-id': UUID,
               'x-cassandra-token': token} 
    r = requests.post(url, 
                 data=json.dumps(table_def),
                 headers=headers)
    # raise any authentication errror
    if r.status_code != 201:
        logging.debug(r.status_code)
        raise Exception(r.text) 
    return r

In [21]:
token = get_token()
result = create_table(astra_schema[TABLE])
logging.debug(result.text)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): 8aa4160b-0096-4061-8cd9-e17ffa74b595-us-east1.apps.astra.datastax.com:443
DEBUG:urllib3.connectionpool:https://8aa4160b-0096-4061-8cd9-e17ffa74b595-us-east1.apps.astra.datastax.com:443 "POST /api/rest/v1/auth HTTP/1.1" 201 52
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): 8aa4160b-0096-4061-8cd9-e17ffa74b595-us-east1.apps.astra.datastax.com:443
DEBUG:urllib3.connectionpool:https://8aa4160b-0096-4061-8cd9-e17ffa74b595-us-east1.apps.astra.datastax.com:443 "POST /api/rest/v1/keyspaces/ds_metrics/tables HTTP/1.1" 201 16
DEBUG:root:{"success":true}


<a id='step4'></a>

### 4. ETL Data from Big Query to Astra Payloads

###### [back to the top](#theTop)

#### Get the Data from Big Query

In [22]:
### Get data from BQ
def get_data_from_BQ(project, dataset, table):
    """
        Helper function that pulls all data from BQ for table
        returns: the data as a dataframe
    """
    query = f'SELECT * FROM {project}.{dataset}.{table}'
    df = client.query(query).to_dataframe()
    return df

In [23]:
data = get_data_from_BQ(PROJECT, DATASET, TABLE) # data frame of data from BQ
data

DEBUG:urllib3.connectionpool:Resetting dropped connection: bigquery.googleapis.com
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/aqueduct-production/jobs HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/aqueduct-production/queries/f9a62baf-1837-4df4-98b2-275282f096c6?maxResults=0&location=US HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/aqueduct-production/jobs/f9a62baf-1837-4df4-98b2-275282f096c6?location=US HTTP/1.1" 200 None
DEBUG:google.cloud.bigquery.table:Started reading table 'aqueduct-production._a645f1a52b3512d886eab733a954fd298a614be6.anon26d6fa33_2a8e_4e9e_8e58_dddc5f445e67' with tabledata.list.
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/aqueduct-production/datasets/_a645f1a52b3512d886eab733a954fd298a614be6/tables/anon26d6fa33_2a8e_4e9e_8e58_dddc5f445e67/d

Unnamed: 0,product,download_date,count
0,DataStax Enterprise-VER_UNK,2020-02-05,1
1,DataStax Enterprise-VER_UNK,2020-01-28,1
2,DataStax Enterprise-6.8,2020-07-16,28
3,DataStax Enterprise-6.8,2020-07-15,18
4,DataStax Enterprise-6.8,2020-07-14,38
...,...,...,...
1965,DataStax Enterprise-1.0,2020-01-17,1
1966,DataStax Enterprise-1.0,2020-01-16,3
1967,DataStax Enterprise-1.0,2020-01-05,2
1968,DataStax Enterprise-1.0,2020-01-03,1


#### ETL the Data to Payloads for Astra

In [24]:
# ETL from BQ dataframe to Astra payloads of {name: foo, value: bar} 
def transform_data(table, keys, data):
    """
        keys: the column names
        data: dataframe of BQ data
        returns: a list of payloads {"name":"name0","value":"value0"} 
    """
    inserts = []
    for index, row in data.iterrows():
        all_data = []
        # the partition key
        data_dict = {}
        data_dict["name"] = table
        data_dict["value"] = table
        all_data.append(data_dict)
        for key in keys:
            data_dict = {}
            data_dict["name"] = str(key).lower()
            data_dict["value"] = str(row[key]).lower()
            all_data.append(data_dict)
        inserts.append(all_data)
    return inserts

In [25]:
inserts = transform_data(TABLE, list(data.columns), data)
logging.debug(pprint(inserts[0]))

DEBUG:root:None


[{'name': 'dse_new_customers_daily', 'value': 'dse_new_customers_daily'},
 {'name': 'product', 'value': 'datastax enterprise-ver_unk'},
 {'name': 'download_date', 'value': '2020-02-05'},
 {'name': 'count', 'value': '1'}]


##### BONUS! ETL Tests

In [26]:
logging.debug(len(data.values.tolist()))

DEBUG:root:1970


In [27]:
## two tests to make sure we transformed BQ Schema to Astra Schema correctly
class TestListElements(unittest.TestCase):
    def setUp(self):
        self.astra = inserts
        self.bq = data.values.tolist()

    def test_count_eq(self):
        self.assertEqual(len(self.astra), len(self.bq))

unittest.main(argv=[''], verbosity=0, exit=False) 

----------------------------------------------------------------------
Ran 1 test in 0.001s

OK


<unittest.main.TestProgram at 0x11eb97be0>

<a id='step5'></a>
### 5. Post data Payloads to Astra

###### [back to the top](#theTop)

In [28]:
def post_data_to_astra(table, payload):
    """
        Post the payload to insert into the table
    """
    url = f"https://{DB_ID}-{REGION}.apps.astra.datastax.com/api/rest/v1/keyspaces/{KYSPC}/tables/{table}/rows"
    headers = {'accept': 'application/json',
               'content-type': 'application/json',
               'x-cassandra-request-id': UUID,
               'x-cassandra-token': token}
    r = requests.post(url, 
                      data=json.dumps(payload),
                      headers=headers)
    # raise any errrors
    if r.status_code != 201:
        logging.debug(r.status_code)
        raise Exception(r.text)
    return(r)

In [29]:
def insert_data(table, inserts):
    """
    """
    num_records = 0
    num_failed = 0
    logging.info(f'Expecting to insert: {len(inserts)} total rows into table {table}')
    for data in inserts:
        total = num_records + num_failed
        if(total % 10 == 0) and (total > 0):
            logging.info(f'\t Processed {total} rows ...')
        payload = {"columns": data}
        response = post_data_to_astra(table.lower(), payload)
        if response.status_code != 201:
            num_failed += 1
        else:
            num_records += 1
    logging.info("Done!")
    logging.info(f'\t Inserted: {num_records} total rows')
    logging.info(f'\t Failed to insert: {num_failed} total rows')


In [30]:
logger.setLevel(logging.INFO)
insert_data(TABLE, inserts)

INFO:root:Expecting to insert: 1970 total rows into table dse_new_customers_daily
INFO:root:	 Processed 10 rows ...
INFO:root:	 Processed 20 rows ...
INFO:root:	 Processed 30 rows ...
INFO:root:	 Processed 40 rows ...
INFO:root:	 Processed 50 rows ...
INFO:root:	 Processed 60 rows ...
INFO:root:	 Processed 70 rows ...
INFO:root:	 Processed 80 rows ...
INFO:root:	 Processed 90 rows ...
INFO:root:	 Processed 100 rows ...
INFO:root:	 Processed 110 rows ...
INFO:root:	 Processed 120 rows ...
INFO:root:	 Processed 130 rows ...
INFO:root:	 Processed 140 rows ...
INFO:root:	 Processed 150 rows ...
INFO:root:	 Processed 160 rows ...
INFO:root:	 Processed 170 rows ...
INFO:root:	 Processed 180 rows ...
INFO:root:	 Processed 190 rows ...
INFO:root:	 Processed 200 rows ...
INFO:root:	 Processed 210 rows ...
INFO:root:	 Processed 220 rows ...
INFO:root:	 Processed 230 rows ...
INFO:root:	 Processed 240 rows ...
INFO:root:	 Processed 250 rows ...
INFO:root:	 Processed 260 rows ...
INFO:root:	 Proce

### Looking at the whole process end-to-end

In [None]:
# THE WHOLE THING!
logger.setLevel(logging.INFO)

# 2. ETL BQ Schema to Astra Schema
bq_schema = get_bq_schema("aqueduct-production", "ds_metrics", "dse_new_customers_daily")
astra_schema = transform_bq_schema("dse_new_customers_daily", bq_schema)

# 3. Create Table in Astra
token = get_token()
result = create_table(astra_schema["dse_new_customers_daily"])

# 4. ETL Data from BQ to Astra payloads
data = get_data_from_BQ("aqueduct-production", "ds_metrics", "dse_new_customers_daily")
inserts = transform_data("dse_new_customers_daily", list(data.columns), data)

# 5. Post payloads to Astra
insert_data("dse_new_customers_daily", inserts)

In [None]:
## DONE! Thank you!