In [3]:
import pandas as pd
import os
import json
from api_library import BQConnect
from api_library import StorageConnect
from google.cloud import bigquery
import logging
from io import StringIO

# Grabbing data from blob
storage_conn = StorageConnect()

try:
    
    logging.info(f'Reading {storage_conn.sm_raw_responses_blob.name} from {storage_conn.raw_data_bucket.name}...')

    # get json string
    json_data_string = storage_conn.sm_raw_responses_blob.download_as_text()

    responses_data_ls = json.loads(json_data_string)
    
    logging.info(f'Successfully loaded {storage_conn.sm_raw_responses_blob.name} from GCP Storage.')

except Exception as e:
    logging.error(f'Failed to load JSON from GCP Storage: {e}')
    raise

INFO:root:GCP credentials found in environment variable: C:\Users\edohner\OneDrive - Lyric Opera of Chicago\Desktop\Python Learning\airflow_test_project\gcp_service_account.json
INFO:root:GCP credentials JSON loaded successfully.
INFO:root:Using project ID from JSON credentials: dbt-test-449821
INFO:root:Reading sm_raw_responses_string from airflow_raw_source_data...
INFO:root:Successfully loaded sm_raw_responses_string from GCP Storage.


In [4]:
# transforming data
    
# Initialize list to hold full response dfs as we loop through
df_list = []

# Normalize the JSON
for response_dict in responses_data_ls:
    df = pd.json_normalize(response_dict['pages'][0]['questions'], 
                        record_path=['answers'], 
                        meta='id',
                        meta_prefix='question_'
    )
        
    # Create a dict of col names and their respective values, represented by the dict path in the each answer dict
    meta_assignments = {
        'respondent_id': 'id',
        'collector_id': 'collector_id',
        'survey_id': 'survey_id',
        'date': 'date_modified',
        'const_id': 'custom_variables.customer_no',
        'email': 'custom_variables.email',
        'performance_code': 'custom_variables.perf',
        'production_name': 'custom_variables.prod_name'
    }
    
    # Create function to loop through each level of the specificed values path to find the value at the 
    # end of that path (as everything else in the path is just dict --> dict -->dict)
    def get_nested_path(d, path):
        keys = path.split('.')
        for key in keys:
            d = d.get(key, None) 
            if not isinstance(d, dict):
                break
        return d
    
    # Apply the function to the assignments dict using dict comprehension --> creates new dict
    # assign key,values from new dict to existing df
    # --> unpacking the dict will return keyword args, formatted as key=value
    # --> we can then pass these keyword args to the .assign(**kwargs) as it looking for an indefinit number of keyword args
    df = df.assign(**{col: get_nested_path(response_dict, value) for col, value in meta_assignments.items()})

    # append this responses data -- now stored in df -- to the df_list to be combined with all responses 
    df_list.append(df)
    
# combine all the dfs in the df_list
norm_answers = pd.concat(df_list)

norm_answers


Unnamed: 0,choice_id,row_id,choice_metadata.weight,tag_data,text,question_id,respondent_id,collector_id,survey_id,date,const_id,email,performance_code,production_name,other_id
0,1492773489,1492773479,100,,,208013575,118802733914,459617762,520007028,2025-02-17T16:00:03+00:00,516434,nnicketakis@ccim.net,SON02,Sondra Radvanovsky in Concert: The Puccini Her...,
1,1492773508,1492773498,100,,,208013576,118802733914,459617762,520007028,2025-02-17T16:00:03+00:00,516434,nnicketakis@ccim.net,SON02,Sondra Radvanovsky in Concert: The Puccini Her...,
2,1492773474,,,,,208013572,118802733914,459617762,520007028,2025-02-17T16:00:03+00:00,516434,nnicketakis@ccim.net,SON02,Sondra Radvanovsky in Concert: The Puccini Her...,
3,,,,[],It was all good...a truly wonderful evening!,208013573,118802733914,459617762,520007028,2025-02-17T16:00:03+00:00,516434,nnicketakis@ccim.net,SON02,Sondra Radvanovsky in Concert: The Puccini Her...,
4,,,,[],"If there's a recording of this concert, I'd ge...",208013578,118802733914,459617762,520007028,2025-02-17T16:00:03+00:00,516434,nnicketakis@ccim.net,SON02,Sondra Radvanovsky in Concert: The Puccini Her...,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3,,,,[],The conductor Nicholas Buc was fantastic! The...,208013573,118804755987,459617692,520007028,2025-02-20T00:21:31+00:00,1326141,meowmeow336@yahoo.com,SING,Singin' in the Rain,
4,,,,[],My ticket wouldn't open up on my phone. I have...,208013574,118804755987,459617692,520007028,2025-02-20T00:21:31+00:00,1326141,meowmeow336@yahoo.com,SING,Singin' in the Rain,
5,1492773532,,,,,208013584,118804755987,459617692,520007028,2025-02-20T00:21:31+00:00,1326141,meowmeow336@yahoo.com,SING,Singin' in the Rain,
6,1492773535,,,,,208013585,118804755987,459617692,520007028,2025-02-20T00:21:31+00:00,1326141,meowmeow336@yahoo.com,SING,Singin' in the Rain,


In [5]:
# sending to GCP Cloud Storage as csv
try:
    buffer = StringIO()
    norm_answers.to_csv(buffer, index=False)
    buffer.seek(0)
    
    storage_conn.sm_normalized_responses_blob.upload_from_string(buffer.getvalue(), content_type="text/csv")
    
    logging.info(f"Successfully uploaded {storage_conn.sm_normalized_responses_blob.name} to {storage_conn.raw_data_bucket.name}.")

except Exception as e:
    logging.error(f"Failed to upload DataFrame to GCS: {e}")
    raise 


INFO:root:Successfully uploaded normalized_data.csv to airflow_raw_source_data.


In [6]:
# getting the CVS from GCP Storage

logging.basicConfig(level=logging.INFO)


# Initiate the StorageConnect object
storage_conn = StorageConnect()

# Get normalized.csv
try:
    normalized_string_data = storage_conn.sm_normalized_responses_blob.download_as_text()
    if normalized_string_data:
        logging.info(f'Retrieved normalized responses csv from blob: {storage_conn.sm_normalized_responses_blob.name}, project: {storage_conn.project_id}')
    else: 
        raise RuntimeError('Downloaded file is empty')
except Exception as e:
    logging.error(f"Failed to retrieve normalized responses CSV from blob: {storage_conn.sm_normalized_responses_blob.name}, project: {storage_conn.project_id}. Error: {e}", exc_info=True)


INFO:root:Retrieved normalized responses csv from blob: normalized_data.csv, project: dbt-test-449821


In [None]:
# sending the transformed data to bigquery

# create dataframe
normalized_df = pd.read_csv(StringIO(normalized_string_data))

# Initiate the BigQueryConnect object
bq_conn = BQConnect()

# Fixing datatypes
normalized_df['date'] = pd.to_datetime(normalized_df['date'], errors='coerce')

str_columns = [ "choice_id", "row_id", "choice_metadata.weight", "question_id",
    "respondent_id", "collector_id", "survey_id", "const_id",
    "email", "performance_code", "production_name", "tag_data",
    "text", "other_id"]

normalized_df[str_columns] = normalized_df[str_columns].astype(str)

# define project_id, dataset_id and table_id
dataset_id = bq_conn.dataset_pipeline
table_id = f'{dataset_id}.raw_sm_responses'

# renaming columns
normalized_df.rename(columns={
    'choice_metadata.weight': 'choice_metadata_weight'
}, inplace=True)

# configure the specific table we're sending the df to in bigquery
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("choice_id", "STRING"),
        bigquery.SchemaField("row_id", "STRING"),
        bigquery.SchemaField("choice_metadata_weight", "STRING"),
        bigquery.SchemaField("question_id", "STRING"),
        bigquery.SchemaField("respondent_id", "STRING"),
        bigquery.SchemaField("collector_id", "STRING"),
        bigquery.SchemaField("survey_id", "STRING"),
        bigquery.SchemaField("date", "TIMESTAMP"),
        bigquery.SchemaField("const_id", "STRING"),
        bigquery.SchemaField("email", "STRING"),
        bigquery.SchemaField("performance_code", "STRING"),
        bigquery.SchemaField("production_name", "STRING"),
        bigquery.SchemaField("tag_data", "STRING"),
        bigquery.SchemaField("text", "STRING"),
        bigquery.SchemaField("other_id", "STRING"),
    ],
    write_disposition="WRITE_TRUNCATE"
)

# load the table to bigquery
load_job = bq_conn.client.load_table_from_dataframe(
    normalized_df, table_id, job_config=job_config
)

logging.info(f'Loading normalized responses to BigQuery, table: {table_id}')

# wait for the result
load_job.result()  

# Check job status and log
if load_job.state == 'DONE':
    if load_job.error_result:
        logging.error(f"Job failed with error: {load_job.error_result}")
    else:
        logging.info(f"Job completed successfully with {load_job.output_rows} rows loaded.")
else:
    logging.warning(f"Job state: {load_job.state}")

ERROR:google.cloud.bigquery._pandas_helpers:Error converting Pandas column with name: "choice_id" and datatype: "float64" to an appropriate pyarrow datatype: Array, ListArray, or StructArray


ArrowTypeError: Error converting Pandas column with name: "choice_id" and datatype: "float64" to an appropriate pyarrow datatype: Array, ListArray, or StructArray